Java中的IO與NIO
- 2020 年 3 月 15 日
- 筆記
前文開了高並發學習的頭,文末說了將會選擇NIO、RPC相關資料做進一步學習,所以本文開始學習NIO知識。
IO知識回顧
在學習NIO前,有必要先回顧一下IO的一些知識。
IO中的流
Java程序通過流(Stream)來完成輸入輸出。流是生產或者消費信息的抽象,流通過Java的輸入輸出與物理設備連接,儘管與之相連的物理設備不盡相同,但是所有的流的行為都是一樣的,所以相同的輸入輸出類的功能和方法適用於所有的外部設備。這意味着一個輸入流可以抽象多種類型的輸入,比如文件、鍵盤或者網絡套接字等,同樣的,一個輸出流也可以輸出到控制台、文件或者相連的網絡。
流的分類
從功能上可以將流分為輸入流和輸出流。輸入和輸出是相對於程序來說的,程序在使用數據時所扮演的角色有兩個:一個是源,一個是目的。若程序是數據的源,對外輸出數據,我們就稱這個數據流相對於程序來說是輸出流,若程序是數據的目的地,我們就稱這個數據流相對於程序來說是輸入流。
從結構上可以將流分為位元組流和字符流,位元組流以位元組為處理單位,字符流以字符為處理單位。
從角色上可以將流分為節點流和過濾流。從特定的地方讀寫的流叫做節點流,如磁盤或者一塊內存區域,而過濾流則以節點流作為輸入或者輸出,過濾流是使用一個已經存在的輸入流或者輸出流連接來創建的。
位元組流的輸入流和輸出流的基礎是InputStream和OutputStream,位元組流的輸入輸出操作由這兩個類的子類實現。字符流是Java1.1後新增的以字符為單位進行輸入和輸出的流,字符流的輸入輸出的基礎是Reader和Writer。
位元組流(byte stream)為處理位元組的輸入和輸出提供了方便的方法,比如使用位元組流讀取或者寫入二進制的數據。字符流(character stream)為字符的輸入和輸出提供了方便,採用了統一的編碼標準,因而可以國際化。值得注意的,在計算機的底層實現中,所有的輸入輸出其實都是以位元組的形式進行的,字符流只是為字符的處理提供了具體的方法。
輸入流
讀數據的邏輯為:
open a stream
while more information
read information
close the stream
忽略掉異常處理,相關的代碼實現大致如下:
InputStream input = new FileInputStream("c:\data\input-text.txt"); int data = input.read(); while(data != -1) { //do something with data... doSomethingWithData(data); data = input.read(); } input.close();
輸出流
寫數據的邏輯為:
open a stream
while more information
write information
close the stream
忽略掉異常處理,相關的代碼實現大致如下:
OutputStream output = new FileOutputStream("c:\data\output-text.txt"); while(hasMoreData()) { int data = getMoreData(); output.write(data); } output.close();
輸入流的類層次
輸出流的類層次
過濾流
在InputStream、OutputStream類的子類中,FilterInputStream和FilterOutputStream過濾流又派生出DataInputStream和DataOutputStream數據輸入輸出流等子類。
IO流的鏈接
Input Stream Chain:從外部文件往程序中寫入數據,所以第一步需要構造輸入流,同時也是節點流,FileInputStream,為了使這個流具備緩衝的特性,需要從節點流轉成過濾流,BufferedInputStream,僅僅有緩衝特性可能還不能滿足日常需求,還需要有讀取基本數據類型的特性,可以基於現有的過濾流再轉成其他的過濾流,DataInputStream,此時便可以方便的從文件中讀取數據;
Output Stream Chain:往外部文件中寫出數據,首先對於外部文件來說,是FileOutputStream,為了使這個流具備緩衝的特性,需要從節點流轉成過濾流,BufferedOutputStream,僅僅有緩衝特性可能還不能滿足日常需求,還需要有寫出基本數據類型的特性,可以基於現有的過濾流再轉成其他的過濾流,DataOutputStream,此時便可以方便的從寫各種數據類型;
Reader的類層次
Writer的類層次
至此,大概回顧了一下IO的部分基礎知識。
IO與裝飾模式
回到IO流的鏈接中,Input Stream Chain的一般代碼是這樣寫的:
InputStream input = new DataInputStream(new BufferedInputStream(new FileInputStream("c:\data\input-text.txt")))
Output Stream Chain的一般代碼是這樣寫的:
OutputStream output = new DataOutputStream(new BufferedOutputStream(new FileOutputStream("c:\data\output-text.txt")))
實際上,這種一個流與另一個流首尾相接,形成一個流管道的實現機制,其實就是裝飾模式的一種應用。
裝飾模式的套路
抽象構件角色(Component):給出一個抽象接口,以規範準備接收附加責任的對象
具體構件角色(ConcreteComponent):定義一個將要接收附加責任的類
裝飾角色(Decorator):持有一個構件(Component)對象引用,並定義一個與抽象構建接口一致的接口
具體裝飾角色(ConcreteDecorator):負責給構件對象貼上附加的責任
裝飾模式的代碼實現
讓我們來看下代碼:
public interface Component { void doSomething(); } public class ConcreteComponent implements Component{ @Override public void doSomething() { System.out.println("功能A"); } } public class Decorator implements Component{//重點1 定義抽象構件接口一致的接口 private Component component;//重點2 持有構件對象的引用 public Decorator(Component component) { this.component = component; } @Override public void doSomething() { component.doSomething(); } }
最後是具體裝飾角色代碼:
public class ConcreteDecorator1 extends Decorator{ public ConcreteDecorator1(Component component) { super(component); } @Override public void doSomething() { super.doSomething(); this.doAnotherThing(); } public void doAnotherThing() { System.out.println("功能B"); } } public class ConcreteDecorator2 extends Decorator{ public ConcreteDecorator2(Component component) { super(component); } @Override public void doSomething() { super.doSomething(); this.doAnotherThing(); } public void doAnotherThing() { System.out.println("功能C"); } }
對於客戶端來說只需要如下簡單的代碼,即可完成對構件對象ConcreteComponent的裝飾:
Component component = new ConcreteDecorator1(new ConcreteDecorator2(new ConcreteComponent())); component.doSomething();
IO中對應的裝飾模式解釋
DataInputStream、BufferedInputStream的角色就像上述的ConcreteDecorator1和ConcreteDecorator2,FilterInputStream類似Decorator,InputStream就是Component。
在JDK的源碼中:
public class FilterInputStream extends InputStream { protected volatile InputStream in; protected FilterInputStream(InputStream in) { this.in = in;} public int read() throws IOException { return in.read();}
再看下Decorator:
public class Decorator implements Component{//重點1 定義抽象構件接口一致的接口 private Component component;//重點2 持有構件對象的引用 public Decorator(Component component) { this.component = component; } @Override public void doSomething() { component.doSomething(); } }
至此,我們可以知道IO是如何體現的裝飾模式。
為什麼需要NIO
IO主要面向流數據,經常為了處理個別位元組或字符,就要執行好幾個對象層的方法調用。這種面向對象的處理方法,將不同的 IO 對象組合到一起,提供了高度的靈活性(IO裏面的裝飾模式),但需要處理大量數據時,卻可能對執行效率造成致命傷害。IO的終極目標是效率,而高效的IO往往又無法與對象形成一一對應的關係。高效的 IO 往往意味着您要選擇從A到B的最短路徑,而執行大量IO 操作時,複雜性毀了執行效率。傳統Java平台上的IO抽象工作良好,適應用途廣泛。但是當移動大量數據時,這些IO類可伸縮性不強,也沒有提供當今大多數操作系統普遍具備的常用IO功能,如文件鎖定、非塊IO、就緒性選擇和內存映射。這些特性對實現可伸縮性是至關重要的,對保持與非 Java 應用程序的正常交互也可以說是必不可少的,尤其是在企業應用層面,而傳統的Java IO 機制卻沒有模擬這些通用IO服務。Java規範請求#51(JSR 51, https://jcp.org/en/jsr/detail?id=51)包含了對高速、可伸縮 I/O 特性的詳盡描述,藉助這一特性,底層操作系統的IO性能可以得到更好發揮。 JSR 51 的實現,其結果就是新增類組合到一起,構成了 java.nio 及其子包,以及 java.util.regex 軟件包,同時現存軟件包也相應作了幾處修改。JCP 網站詳細介紹了 JSR 的運作流程,以及 NIO 從最初的提議到最終實現並發佈的演進歷程。隨着 Merlin(Jdk1.4) 的發佈,操作系統強大的IO特性終於可以藉助 Java 提供的工具得到充分發揮。論及IO性能,Java再也不遜於任何一款編程語言。
到這裡,我們知道Java NIO的目的是為了提高效率,充分利用操作系統提供的IO特性,所以為了應對更多的處理請求,我們需要新的IO模型(NIO)。
NIO的核心組件
這一節,我們將開始學習NIO。
上文中曾提到,NIO的三個核心組件:Selector、Channel和Buffer。用一張圖來抽象這三者之間的關係。
在沒有Java NIO之前,傳統的IO對於網絡連接的處理,通常會採用Thread Per Task,即一線程一連接的模式,這種模式在中小量業務處理時基本上是能滿足要求的,但是隨着連接數不斷增加,所創建的線程會不斷佔用內存空間,同時大量的線程也會帶來頻繁的上下文切換,CPU都用來操作上下文切換了,必然影響實際的業務處理。有了Java NIO之後,就可以花少量的線程來處理大量的連接,在上圖中,Selector是對Linux下的select/poll/epoll的包裝,Channel是可IO操作的硬件、文件、socket、其他程序組件的包裝,我們可以把Channel看成是網絡連接,網絡連接上主要有connect、accept、read和write等事件,Selector負責監聽這些事件的發生,一旦某個Channel上發生了某個事件,Thread切換到該Channel進行事件處理。類比到操作系統層面,如果是select/poll方式,應用進程對每個socket(Channel)的文件描述符順序掃描,查看是否就緒,阻塞在select(Selector)上,如果就緒,調用recvfrom,如果是epoll方式,就不是順序掃描,而是提供回調函數,當文件描述符就緒時,直接調用回調函數,進一步提高效率。圖中還有一個組件就是Buffer,它實際上就是一塊內存,底層是基於數組來實現的,一般和Channel成對出現,數據的讀寫都是通過Buffer來實現的。
接下來,依次來看下Selector、Channel和Buffer。
Selector模塊
Selector
Selector是一個多路傳輸的SelectableChannel對象。
Selector可以通過調用自身的open方法來進行創建,在open方法裏面是通過系統默認的selector provider來創建Selector的。當然也可以通過調用openSelector來自定義一個Selector。一個Selector將會一直保持open的狀態直到調用close方法。
一個可選擇的Channel對象註冊到Selector的行為是通過SelectionKey對象來體現的。Selector維護了三種SelectionKey的集合:
key set包含了當前的註冊到Selector的Channel對應的所有的keys,這些keys可以通過keys()返回;
selected-key set的每個成員都是相關的通道被選擇器(在前一個選擇操作
中)判斷為已經準備好的,並且包含於鍵的 interest 集合中的操作。這個集合通過 selectedKeys()方法返回。selected-key set是key set的子集;
cancelled-key set是一個被取消的Channel但是還未被註銷的key的集合,這個集合無法直接訪問,cancelled-key set也是key set的子集。
對於一個剛創建的Selector,上述三個集合默認都是空的。
通過調用Channel的register方法,一個新的key將會被添加到Selector的key set中。在執行selection操作時,cancelled keys將會從key set中移除,key set本身是不能被直接修改的。
不管是直接關閉Channel,還是調用SelectionKey的close方法,都會有一個key被添加到cancelled-key set中。在下一個selection操作中,取消一個key的動作都會導致其對應的Channel被註銷掉,同時這個key也會從Selector的key set中移除。
在執行selection操作時,keys會被添加到selected-key set中,通過Set的remove方法或者是Iterator的remove方法,key可以直接從selected-key set中移除,除此以外,沒有其他的方法可以達到這樣的效果。特別需要強調的是, 移除操作不是selection的副作用。key也不能直接添加到selected-key set中。
Selection
在每一次Selection操作中,keys有可能從selected-key set、key set或者cancelled-key set中增加或者刪除。Selection操作是通過select()、select(long)、selectNow()方法來執行的,一般包含如下三步:
1、cancelled-key set中的每一個key都可以從它所屬的key set中移除,同時它所屬的Channel也會註銷,這一步將會使cancelled-key set為空;
2、底層操作系統開始被查詢以更新剩餘的Channel通道的就緒狀態來執行這個key感興趣的事件,對於一個至少有一種這樣操作的Channel來說,下面2個動作將會被執行:
2.1 如果Channel的key不在selected-key set中,然後key會被增加到selected-key set中,同時它的就緒操作會被修改出來準確的標記出哪一個Channel已完成準備工作,任意之前的ready set的就緒信息將會被丟棄;
2.2 如果Channel的key在selected-key set中,同時它的就緒操作會被修改出來準確的標記出哪一個Channel已完成準備工作,任意之前的ready set的就緒信息將會被保留,換句話來說,這個底層操作系統的ready set將會按位寫入到當前的key的ready set。
如果所有的key set裏面的key一開始就沒有interest set,那麼不管是selected-key set還是它對應的就緒操作都不會被更新。
3、如果執行步驟2時有新的key加入到cancelled-key,則按步驟1進行處理。
這三個方法的本質區別就是選擇操作是否阻塞等待一個或多個通道準備就緒,或者等待了多長時間。
Concurrency
選擇器本身可以安全地供多個並發線程使用。但是,它們的key set不是。
在執行選擇操作時,選擇器在 Selector 對象上進行同步,然後是key set,最後是selected-key set,按照這樣的順序。cancelled-key set也在選擇過程的的第 1步和第 3 步之間保持同步。
在進行選擇操作時,對選擇器的interest sets所做的更改對該操作沒有影響,他們將在下一個選擇操作中看到。
選擇器的一個或多個鍵集中的鍵的存在並不表示該鍵有效或其通道已打開。如果其他線程有可能取消鍵或關閉通道,則應用程序代碼應謹慎同步並在必要時檢查這些條件。
線程會阻塞在select()或者select(long)方法上,如果其他線程想中斷這個阻塞,可以通過如下三種方式:
通過調用選擇器的wakeup方法;
通過調用選擇器的close方法;
通過調用被阻塞線程的interrupt方法,在這種情況下,將設置其中斷狀態,並調用選擇器的wakeup方法。
close方法以與選擇操作相同的順序在選擇器和所有三個鍵集上同步。
通常,選擇器的key和selected-key不能安全地供多個並發線程使用。如果此類線程可以直接修改這些集合之一,則應通過在集合本身上進行同步來控制訪問。 這些集合的迭代器方法返回的迭代器是快速失敗的:如果在創建迭代器之後修改了集合,則除了通過調用迭代器自己的remove方法之外,其他任何方式都會拋出java.util.ConcurrentModificationException。
如下為Selector提供的所有方法:
SelectionKey
表示SelectableChannel向Selector的註冊的令牌。
每次將Channel註冊到選擇器中時,都會創建一個選擇鍵。這個鍵一直有效,直到通過調用其cancel方法,關閉其通道或關閉其選擇器將其取消。取消鍵不會立即將其從選擇器中刪除,而是將其添加到選擇器的“取消鍵”集合中,以便在下一次選擇操作期間將其刪除。可以通過調用isValid方法來測試這個鍵是否有效性。
選擇鍵包含兩個表示為整數值的操作集。操作集的每個位表示鍵的通道支持的可選操作的類別。
興趣集確定下一次調用選擇器的選擇方法之一時,將測試那些操作類別是否準備就緒。使用創建鍵時給定的值來初始化興趣集,以後可以通過interestOps(int)方法對其進行更改。
準備集標識鍵的選擇器已檢測到鍵的通道已準備就緒的操作類別。 創建key時,準備集將初始化為零。它可能稍後會在選擇操作期間由選擇器更新,但無法直接更新。
選擇鍵的就緒集指示其通道已為某個操作類別做好了提示,但不是保證,此類類別中的操作可以由線程執行而不會導致線程阻塞。準備工作很可能在選擇操作完成後立即準確。外部事件和在相應通道上調用的I/O操作可能會使它不準確。
此類定義了所有已知的操作集位,但是精確地給定通道支持哪些位取決於通道的類型。SelectableChannel的每個子類都定義一個validOps()方法,該方法返回一個集合,該集合僅標識通道支持的那些操作。嘗試設置或測試鍵通道不支持的操作集位將導致run-time exception。
通常有必要將一些特定於應用程序的數據與選擇鍵相關聯,例如,一個對象代表一個更高級別協議的狀態並處理就緒通知,以實現該協議。因此,選擇鍵支持將單個任意對象附加到鍵上。可以通過attach方法附加對象,然後再通過attach方法檢索對象。
選擇鍵可安全用於多個並發線程。通常,讀取和寫入興趣集的操作將與選擇器的某些操作同步。確切地說,如何執行此同步取決於實現方式:在低性能的實現方式中,如果選擇操作已在進行中,則興趣組的讀寫可能會無限期地阻塞;在高性能實現中,讀取或寫入興趣集可能會短暫阻塞(如果有的話)。無論如何,選擇操作將始終使用該操作開始時當前的興趣設置值。
如下為SelectionKey提供的所有方法:
Channel模塊
Channel用來表示諸如硬件設備、文件、網絡套接字或程序組件之類的實體的開放連接,該實體能夠執行一個或多個不同的I/O操作(例如讀取或寫入)。I/O 可以分為廣義的兩大類別:File I/O和Stream I/O。那麼相應地有兩種類型的通道,它們是文件(file)通道和套接字(socket)通道。文件通道有一個FileChannel類,而套接字則有三個socket通道類:SocketChannel、 ServerSocketChannel和DatagramChannel。通道可以以阻塞(blocking)或非阻塞(nonblocking)模式運行。非阻塞模式的通道永遠不會讓調用的線程休眠。請求的操作要麼立即完成,要麼返回一個結果表明未進行任何操作。只有面向流的(stream-oriented)的通道,如 SocketChannel、ServerSocketChannel才能使用非阻塞模式。SocketChannel、ServerSocketChannel從 SelectableChannel引申而來。從 SelectableChannel 引申而來的類可以和支持有條件的選擇(readiness selectio)的選擇器(Selector)一起使用。將非阻塞I/O 和選擇器組合起來就可以使用多路復用 I/O(multiplexed I/O),也就是前面提到的select/poll/epoll。由於FileChannel不是從SelectableChannel類引申而來,所以FileChannel,也就是文件IO,是無法使用非阻塞模型的。
FileChannel
用於讀取、寫入、映射和操作文件的通道。
文件通道是可以連接到文件的SeekableByteChannel。它在文件中具有當前位置,支持查詢和修改。文件本身包含一個可變長度的位元組序列,可以讀取和寫入這些位元組,並且可以查詢其當前大小。當寫入位元組超過當前大小時,文件大小會增加; 文件的大小在被截斷時會減小。該文件可能還具有一些關聯的元數據,例如訪問權限、內容類型和最後修改時間等,此類未定義用於元數據訪問的方法。
除了熟悉的位元組讀取、寫入和關閉操作之外,此類還定義了以下特定於文件的操作:
可以以不影響通道當前位置的方式在文件中的絕對位置讀取或寫入位元組;
文件的區域可以直接映射到內存中。對於大文件,這通常比調用傳統的讀取或寫入方法要有效得多;
對文件所做的更新可能會被強制發送到基礎存儲設備,以確保在系統崩潰時不會丟失數據;
位元組可以從文件傳輸到其他通道,反之亦然,可以通過操作系統進行優化,將位元組快速傳輸到文件系統緩存或從文件系統緩存快速傳輸;
文件的區域可能被鎖定,以防止其他程序訪問;
文件通道可以安全地供多個並發線程使用。如Channel接口所指定的,close方法可以隨時調用。在任何給定時間,可能僅在進行涉及通道位置或可以更改其文件大小的一項操作。當第一個此類操作仍在進行時,嘗試啟動第二個此類操作的嘗試將被阻止,直到第一個操作完成。其他操作,尤其是採取明確立場的操作,可以同時進行。它們是否實際上執行取決於底層實現。
此類的實例提供的文件視圖保證與同一程序中其他實例提供的相同文件的其他視圖一致。但是,由於底層操作系統執行的緩存和網絡文件系統協議引起的延遲,此類實例提供的視圖可能與其他並發運行的程序所見的視圖一致,也可能不一致。 不管這些其他程序的編寫語言是什麼,以及它們是在同一台計算機上還是在其他計算機上運行,都是如此。任何此類不一致的確切性質都取決於底層操作系統如何實現。
通過調用此類定義的open方法來創建文件通道。也可以通過調用後續類的getChannel方法從現有的FileInputStream,FileOutputStream或RandomAccessFile對象獲得文件通道,該方法返回連接到相同基礎文件的文件通道。如果文件通道是從現有流或隨機訪問文件獲得的,則文件通道的狀態與其getChannel方法返回該通道的對象的狀態緊密相連。無論是顯式更改通道位置,還是通過讀取或寫入位元組來更改通道位置,都會更改原始對象的文件位置,反之亦然。通過文件通道更改文件的長度將更改通過原始對象看到的長度,反之亦然。通過寫入位元組來更改文件的內容將更改原始對象看到的內容,反之亦然。
在各個點上,此類都指定需要一個“可讀取”、“可寫入”或“可讀取和寫入”的實例。通過FileInputStream實例的getChannel方法獲得的通道將打開以供讀取。通過FileOutputStream實例的getChannel方法獲得的通道將打開以進行寫入。最後,如果實例是使用模式“ r”創建的,則通過RandomAccessFile實例的getChannel方法獲得的通道將打開以供讀取;如果實例是使用模式“ rw”創建的,則將打開以進行讀寫。打開的用於寫入的文件通道可能處於附加模式,例如,如果它是從通過調用FileOutputStream(File,boolean)構造函數並為第二個參數傳遞true創建的文件輸出流中獲得的。在這種模式下,每次調用相對寫入操作都會先將位置前進到文件末尾,然後再寫入請求的數據。位置的提升和數據的寫入是否在單個原子操作中完成取決於操作系統的具體實現。
SocketChannel
一個可選擇的Channel,用於面向流的連接socket。
通過調用此類的open方法來創建套接字通道。無法為任意現有的套接字創建通道。新建的套接字通道一打開,是處於尚未連接的狀態的。嘗試在未連接的通道上調用I/O操作將導致引發NotYetConnectedException。套接字通道可以通過調用其connect方法進行連接,連接後,套接字通道將保持連接狀態,直到關閉為止。套接字通道是否已連接可以通過調用其isConnected方法來確定。
套接字通道支持非阻塞連接,創建一個套接字通道,並可以通過connect方法啟動建立到遠程套接字的鏈接的過程,然後由finishConnect方法完成。可以通過調用isConnectionPending方法來確定連接操作是否正在進行。
套接字通道支持異步關閉,這類似於Channel類中指定的異步關閉操作。如果套接字的輸入端被一個線程關閉,而另一個線程在套接字通道的讀取操作中被阻塞,則阻塞線程中的讀取操作將完成而不會讀取任何位元組,並且將返回-1。如果套接字的輸出端被一個線程關閉,而另一個線程在套接字通道的寫操作中被阻塞,則被阻塞的線程將收到AsynchronousCloseException。
套接字選項是使用setOption方法配置的。套接字通道支持以下選項:
選項名稱 描述
SO_SNDBUF 套接字發送緩衝區的大小
SO_RCVBUF 套接字接收緩衝區的大小
SO_KEEPALIVE 保持連接活躍
SO_REUSEADDR 重複使用地址
SO_LINGER 如果有數據,則在關閉時徘徊(僅在阻塞模式下配置)
TCP_NODELAY 禁用Nagle算法
也可以支持其他(特定於實現的)選項。
套接字通道可以安全地供多個並發線程使用。它們支持並發讀取和寫入,儘管在任何給定時間最多可以讀取一個線程,並且最多可以寫入一個線程。connect和finishConnect方法彼此相互同步,並且在這些方法之一的調用正在進行時嘗試啟動讀取或寫入操作將被阻止,直到該調用完成為止。
ServerSocketChannel
一個可選擇的Channel,用於面向流的監聽socket。
通過調用此類的open方法可以創建服務器套接字通道。無法為任意現有的ServerSocket創建通道。新創建的服務器套接字通道一打開,是處於尚未綁定的狀態的。嘗試調用未綁定的服務器套接字通道的accept方法將導致引發NotYetBoundException。可以通過調用此類定義的bind方法之一來綁定服務器套接字通道。
套接字選項是使用setOption方法配置的。服務器套接字通道支持以下選項:
選項名稱 描述
SO_RCVBUF 套接字接收緩衝區的大小
SO_REUSEADDR 重複使用地址
也可以支持其他(特定於實現的)選項。
服務器套接字通道可安全用於多個並發線程。
Buffer模塊
Buffer
一個特定原始類型數據的容器。
緩衝區是特定原始類型元素的線性有限序列。除了其內容之外,緩衝區的基本屬性還包括capacity、limit和position:
緩衝區的capacity是它包含的元素數量。緩衝區的capacity永遠不會為負,也不會改變。
緩衝區的limit是不應讀取或寫入的第一個元素的索引。緩衝區的limit永遠不會為負,也永遠不會大於緩衝區的capacity。
緩衝區的position是下一個要讀取或寫入的元素的索引。緩衝區的position永遠不會為負,也不會大於limit。
對於每個非布爾基本類型,此類都有一個子類,即IntBuffer、ShortBuffer、LongBuffer、CharBuffer、ByteBuffer、DoubleBuffer和FloatBuffer。
Transferring data
此類的每個子類定義了get和put操作的兩類:
相對操作從當前位置開始讀取或寫入一個或多個元素,然後將該位置增加所傳送元素的數量。如果請求的傳輸超出limit,則相對的get操作將引發BufferUnderflowException,而相對的put操作將引發BufferOverflowException; 無論哪種情況,都不會傳輸數據。
絕對運算採用顯式元素索引,並且不影響位置。如果index參數超出limit,則絕對的get和put操作將引發IndexOutOfBoundsException。
當然,也可以通過始終相對於當前位置的通道的I/O操作將數據移入或移出緩衝區。
Marking and resetting
緩衝區的mark標記是在調用reset方法時將其position重置的索引。mark並非總是定義的,但是定義時,它永遠不會為負,也永遠不會大於position。如果定義了mark,則在將position或limit調整為小於mark的值時將mark標記丟棄。如果未定義mark,則調用reset方法將引發InvalidMarkException。
Invariants
對於mark,position,limit和capacity,以下不變式成立:
0 <=mark<= position <=limit<=capacity
新創建的緩衝區始終具有零位置和未定義的標記。 初始時limit可以為零,也可以是其他一些值,具體取決於緩衝區的類型及其構造方式。新分配的緩衝區的每個元素都初始化為零。
Clearing, flipping, and rewinding
除了訪問position,limit和capacity以及mark和reset的方法之外,此類還定義了以下對緩衝區的操作:
clear使緩衝區為新的通道讀取或相對put操作序列做好準備:將limit設置為capacity,並將位置position為零。
flip使緩衝區為新的通道寫入或相對get操作序列做好準備:將limit設置為當前position,然後將position設置為零。
rewind使緩衝區準備好重新讀取它已經包含的數據:保留limit不變,並將position設置為零。
Read-only buffers
每個緩衝區都是可讀的,但並非每個緩衝區都是可寫的。每個緩衝區類的變異方法都指定為可選操作,當對只讀緩衝區調用時,該方法將引發ReadOnlyBufferException。只讀緩衝區不允許更改其內容,但其mark,positoin和limit是可變的。緩衝區是否為只讀可以通過調用isReadOnly方法來確定。
Thread safety
緩衝區不能安全用於多個並發線程。如果一個緩衝區將由多個線程使用,則應通過適當的同步來控制對該緩衝區的訪問。
Invocation chaining
此類中沒有其他要返回值的方法被指定為返回在其上調用它們的緩衝區。這使得方法調用可以鏈接在一起,例如,語句序列:
b.flip();
b.position(23);
b.limit(42);
可以用一個更緊湊的語句代替
b.flip().position(23).limit(42);
基於NIO實現一個簡單的聊天程序
上述總結了NIO的基礎知識,知道了NIO可以處理文件IO和流IO(網絡IO),NIO最大的魅力還是在於網絡IO的處理,接下來將通過NIO實現一個簡單的聊天程序來繼續了解Java的NIO,這個簡單的聊天程序是一個服務端多個客戶端,客戶端相互之間可以實現數據通信。
服務端:
public class NioServer { //通過Map來記錄客戶端連接信息 private static Map<String,SocketChannel> clientMap = new HashMap<String,SocketChannel>(); public static void main(String[] args) throws Exception { //創建ServerSocketChannel 用來監聽端口 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //配置為非阻塞 serverSocketChannel.configureBlocking(false); //獲取服務端的socket ServerSocket serverSocket = serverSocketChannel.socket(); //監聽8899端口 serverSocket.bind(new InetSocketAddress(8899)); //創建Selector Selector selector = Selector.open(); //serverSocketChannel註冊到selector 初始時關注客戶端的連接事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true) { try { //阻塞 關注感興趣的事件 selector.select(); //獲取關注事件的SelectionKey集合 Set<SelectionKey> selectionKeys = selector.selectedKeys(); //根據不同的事件做不同的處理 selectionKeys.forEach(selectionKey -> { final SocketChannel client; try { //連接建立起來之後 開始監聽客戶端的讀寫事件 if (selectionKey.isAcceptable()) { //如何監聽客戶端讀寫事件 首先需要將客戶端連接註冊到selector //如何獲取客戶端建立的通道 可以通過selectionKey.channel() //前面只註冊了ServerSocketChannel 所以進入這個分支的通道必定是ServerSocketChannel ServerSocketChannel server = (ServerSocketChannel)selectionKey.channel(); //獲取到真實的客戶端 client = server.accept(); client.configureBlocking(false); //客戶端連接註冊到selector client.register(selector,SelectionKey.OP_READ); //selector已經註冊上ServerSocketChannel(關注連接)和SocketChannel(關注讀寫) //UUID代表客戶端標識 此處為業務信息 String key = "[" + UUID.randomUUID().toString() + "]"; clientMap.put(key,client); }else if (selectionKey.isReadable()) { //處理客戶端寫過來的數據 對於服務端是可讀數據 此處必定是SocketChannel client = (SocketChannel)selectionKey.channel(); //Channel不能讀寫數據 必須通過Buffer來讀寫數據 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //服務端讀數據到Buffer int count = client.read(byteBuffer); if(count > 0) { //讀寫轉換 byteBuffer.flip(); //寫數據到其他客戶端 Charset charset = Charset.forName("utf-8"); String receiveMessage = String.valueOf(charset.decode(byteBuffer).array()); System.out.println("client:" + client + receiveMessage); String sendKey = null; for(Map.Entry<String,SocketChannel> entry: clientMap.entrySet()) { if(client == entry.getValue()) { //拿到發送者的UUID 用於模擬客戶端的聊天發送信息 sendKey = entry.getKey(); break; } } //給所有的客戶端發送信息 for(Map.Entry<String,SocketChannel> entry: clientMap.entrySet()) { //拿到所有建立連接的客戶端對象 SocketChannel value = entry.getValue(); ByteBuffer writeBuffer = ByteBuffer.allocate(1024); //這個put操作是Buffer的讀操作 writeBuffer.put((sendKey + ":" + receiveMessage).getBytes()); //write之前需要讀寫轉換 writeBuffer.flip(); //寫出去 value.write(writeBuffer); } } } }catch (Exception ex) { ex.printStackTrace(); } }); //處理完成該key後 必須刪除 否則會重複處理報錯 selectionKeys.clear(); }catch (Exception e) { e.printStackTrace(); } } } }
客戶端:
public class NioClient { public static void main(String[] args) throws Exception { //創建SocketChannel 用來請求端口 SocketChannel socketChannel = SocketChannel.open(); //配置為非阻塞 socketChannel.configureBlocking(false); //創建Selector Selector selector = Selector.open(); //socketChannel註冊到selector 初始時關注向服務端建立連接的事件 socketChannel.register(selector,SelectionKey.OP_CONNECT); //向遠程發起連接 socketChannel.connect(new InetSocketAddress("127.0.0.1",8899)); while (true) { //阻塞 關注感興趣的事件 selector.select(); //獲取關注事件的SelectionKey集合 Set<SelectionKey> selectionKeys = selector.selectedKeys(); //根據不同的事件做不同的處理 for(SelectionKey selectionKey : selectionKeys) { final SocketChannel channel; if(selectionKey.isConnectable()) { //與服務端建立好連接 獲取通道 channel = (SocketChannel)selectionKey.channel(); //客戶端與服務端是否正處於連接中 if(channel.isConnectionPending()) { //完成連接的建立 channel.finishConnect(); //發送連接建立的信息 ByteBuffer writeBuffer = ByteBuffer.allocate(1024); //讀入 writeBuffer.put((LocalDateTime.now() + "連接成功").getBytes()); writeBuffer.flip(); //寫出 channel.write(writeBuffer); //TCP雙向通道建立 //鍵盤作為標準輸入 避免主線程的阻塞 新起線程來做處理 ExecutorService service = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory()); service.submit(() -> { while (true) { writeBuffer.clear(); //IO操作 InputStreamReader inputStreamReader = new InputStreamReader(System.in); BufferedReader reader = new BufferedReader(inputStreamReader); String readLine = reader.readLine(); //讀入 writeBuffer.put(readLine.getBytes()); writeBuffer.flip(); //寫出 channel.write(writeBuffer); } }); } //客戶端也需要監聽服務端的寫出信息 所以需要關注READ事件 channel.register(selector,SelectionKey.OP_READ); }else if(selectionKey.isReadable()) { //從服務端讀取事件 channel = (SocketChannel)selectionKey.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int count = channel.read(readBuffer); if(count > 0) { readBuffer.flip(); Charset charset = Charset.forName("utf-8"); String receiveMessage = String.valueOf(charset.decode(readBuffer).array()); System.out.println("client:" + receiveMessage); } } //處理完成該key後 必須刪除 否則會重複處理報錯 selectionKeys.clear(); } } } }
演示效果:
最後我們來總結一下:
1、IO面向流,NIO面向緩衝區,流只能單向傳輸,而緩衝區可以雙向傳輸,雙向傳輸的模型除了吞吐量得到增加外,這個模型也更接近操作系統和網絡的底層;
2、對於網絡IO,Selector和Channel組合在一起,實現了IO多路復用,這樣少量的線程也能處理大量的連接,適用於應對高並發大流量的場景;而對於文件IO,就談不上IO多路復用,但是FileChannel通過提供transferTo、transferFrom方法來減少底層拷貝的次數也能大幅提升文件IO的性能;
3、Buffer緩衝區用來存儲數據,除了沒有布爾類型外,其他基礎數據類型和Java裏面的基礎類型是一樣的,Buffer的核心屬性是position、limit和capacity,讀寫數據時是這幾個變量在不斷翻轉變化,但是其實這個設計並不優雅,Netty的ByteBuf提供讀寫索引分離的方式使實現更加優雅;
4、NIO的編程模式總結:
將Socket通道註冊到Selector中,監聽感興趣的事件;
當感興趣的事件就緒時,則會進去我們處理的方法進行處理;
每處理完一次就緒事件,刪除該選擇鍵(因為我們已經處理完了)。
參考資料:
http://ifeve.com/java-nio-all/
https://segmentfault.com/a/1190000014932357?utm_source=tag-newest
https://www.zhihu.com/question/29005375?sort=created
部分圖片截圖自某學習視頻,如有侵權請告知。