Java 異步 I/O
Java 中的異步 I/O 簡稱 AIO, A 即 Asynchronous。AIO 在 JDK1.7 時引入,基於操作系統提供的異步 I/O 通信模型,封裝了一些進行異步 I/O 操作的 API。
1. 異步 I/O 模型
學習 Java I/O 相關操作之前應該先了解其背後的 I/O 模型。Java 典型的基於流的文件操作和網絡通信都是基於同步阻塞 I/O 模型,JDK1.4 引入的 NIO 基於多路復用 I/O 模型,而 AIO 則基於異步 I/O 模型。在 Linux 操作系統中,異步模型從 I/O 設備讀取數據的流程如下圖所示。
- 應用程序向內核發起
aio_read
系統調用,傳遞緩存區信息,要讀取的文件信息; - 內核接收請求之後立即返回,應用程序未阻塞;
- 內核等待 CPU 或者 DMA 設備將數據從 I/O 設備複製到內核緩衝區;
- 內核將數據複製到用戶空間緩衝區;
- 內核發送一個信號給用戶程序,告知數據已複製完成;
- 應用程序處理用戶空間緩衝區中的數據。
2. 異步通道
基於異步 I/O 模型,JDK 提供了面向通道和緩衝區編程的 API。事實上,Java 為基於同步阻塞 I/O 模型的 「舊I/O」 和基於多路復用 I/O 模型的 「新I/O」 也提供了面向通道和緩衝區的 API。異步 I/O 的核心接口是 AsynchronousChannel
,這個接口有文件 I/O 的實現和網絡 I/O 的實現。
AsynchronousFileChannel
異步文件通道,用於異步操作文件;AsynchronousSocketChannel
異步套接字通道,用於 TCP 通信;AsynchronousServerSocketChannel
異步套接字監聽通道,作為服務端,接收 TCP 連接並創建AsynchronousSocketChannel
。
3 異步操作的兩種形式
異步通道 AsynchronousChannel
並沒有顯式提供一些必須實現的異步操作的抽象方法(事實上,這個接口僅提供了抽象方法 close()
),但是它在注釋中給出了異步操作 API 的兩種形式。一種異步操作是返回一個 Future
,另一種是往異步方法中傳遞一個回調函數,也就是一個 CompletionHandler
的對象,這兩種形式一般的異步編程框架中很常見。
3.1 返回 Future 形式
Future<V> operation(...)
void operation(... A attachment, CompletionHandler<V,? super A> handler)
其中 operation() 代表異步操作,比如從 I/O 設備中讀取數據 read,往 I/O 設備中寫入數據 write。
第一種是異步操作返回 Future<V>
,其中 V 是異步操作返回值的類型。開發人員可以調用 Future#isDone()
或者 Future#isCancelled()
查詢異步操作的狀態,也可以調用 Future#get()
,阻塞當前線程,直到異步操作完成。
讀取數據
Future<Integer> future = readChan.read(buff); // 異步讀取數據,並立即返回
future.get(); // 阻塞,等到異步操作完成,效率低
寫入數據
Future<Integer> future = writeChan.write(buff, position); // 異步寫入數據,並立即返回
len = future.get(); // 阻塞等待異步操作完成,效率低
當然,為了提高效率,開發過程中也可以不調用 Future#get() 方法來阻塞代碼,可以通過輪詢的方式檢查 Future 是否已經完成,完成之後再調用 Future#get() 來獲取結果。
3.2 回調形式
第二種操作是往異步函數中傳遞一個 A attachment
和 CompletionHandler<V, ? super A>
。其中 A 表示附件的類型,附件通常用來往 CompletionHandler
對象中傳入一些上下文信息,V 表示異步操作返回值類型。CompletionHandler
提供了兩個抽象方法:completed(V result, A attachment) 和 failed(Throwable t, A attachment)。當異步操作成功,completed 會被調用;當異步操作失敗,failed 會被調用。讀取到一個數據塊就會調用回調代碼,不會阻塞。
可以採用匿名內部類的方式去實現回調接口,也可以採用一般實現類,通過 attachment 傳遞上下文的形式實現回調邏輯。
匿名內部類方式
readChan.read(buff, 0, null, new CompletionHandler<>() { // 從位置 0 開始讀取數據,數據讀取到緩衝區 buff 中
long readSize = 0; // 已經讀取的位元組數
@Override
public void completed(Integer result, Object attachment) {
// 打印讀取到的數據
System.out.println(Thread.currentThread() + new String(buff.array(), 0, result));
try {
if ( (readSize = readSize + result) < readChan.size()) { // 已讀取位元組數少於文件總位元組數,繼續讀取
buff.clear(); // 將 buff 的 position 移動到起始位置,使其變為可寫狀態
readChan.read(buff, readSize, null, this); // 遞歸,繼續讀取,注意改變讀取位置,Handler 直接使用 this。
} else {
semaphore.release();
}
} catch (IOException e) {
e.printStackTrace();
}
}
傳遞上下文(attachment)方式
一般的調用邏輯。
Context context = new Context(); // 自定義類,存放上下文信息,上下文信息可根據需要設定
context.asyncFileChan = asyncFileChan;
context.buffer = ByteBuffer.allocate(4);
AsyncReadDataHandler callback = new AsyncReadDataHandler(); // 創建一個處理器對象
asyncFileChan.read(context.buffer, 0, context, callback); // 執行異步讀取數據
AsyncReadDataHandler 和 Context 的實現。
/** 定義上下文類 */
class Context {
AsynchronousFileChannel asyncFileChan;
ByteBuffer buffer;
}
/** 回調實現類 */
class AsyncReadDataHandler implements CompletionHandler<Integer, Context> {
private long readSize = 0;
private Semaphore semaphore = new Semaphore(0);
@Override
public void completed(Integer size, Context context) {
System.out.print(new String(context.buffer.array(), 0, size));
context.buffer.clear();
try {
if ( (readSize = readSize + context.buffer.limit()) < context.asyncFileChan.size()) {
// 還有數據,繼續讀。數據放入到 context.buffer 中,從 readSize 位置開始讀,附件是 context,處理器是當前對象
context.asyncFileChan.read(context.buffer, readSize, context, this);
} else {
semaphore.release();
}
} catch (IOException e) {
e.printStackTrace();
semaphore.release();
}
}
@Override
public void failed(Throwable cause, Context context) {
cause.printStackTrace();
semaphore.release();
}
// 等待結束
public void waitForEnd() throws InterruptedException {
semaphore.acquire();
}
}
4. 異步文件通道
異步文件通道和文件通道的大部分 API 相同,不同的是異步文件通道支持異步讀取和寫入數據。這裡僅介紹這兩類異步 API,其它 API 以及內存映射相關的內容可以參考Java NIO 文件通道 FileChannel 用法。
public class AsyncFileChannel {
public static void main(String[] args) throws IOException, InterruptedException, ExecutionException {
Path path = Paths.get("data.txt"); // 準備一些數據
/* 異步寫入數據 */
byte[] data = "This is an example of AsynchronousFileChannel".getBytes(StandardCharsets.UTF_8);
ByteBuffer buff = ByteBuffer.allocate(4); // 分配一個大小為 4 的位元組緩衝區
AsynchronousFileChannel writeChan = AsynchronousFileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE);
long position = 0; // 記錄寫入數據在文件中的起始位置
for (int i = 0; i<data.length; i+=buff.capacity()) {
buff.put(data, i, Math.min(buff.capacity(), data.length - i)); // 將數據放入緩衝區
buff.flip(); // 將緩衝區變為讀模式
int len; // 記錄成功寫入的位元組長度
while (buff.hasRemaining()) {
Future<Integer> future = writeChan.write(buff, position); // 異步寫入數據,並立即返回
len = future.get(); // 阻塞等待異步操作完成,效率低
position += len; // 更新 position 位置
}
buff.clear(); // 清空緩衝區,將緩衝區變為寫模式
}
writeChan.force(false);
writeChan.close();
/* 異步讀取數據 */
Semaphore semaphore = new Semaphore(0);
AsynchronousFileChannel readChan = AsynchronousFileChannel.open(path, StandardOpenOption.READ); // 打開一個異步文件通道
readChan.read(buff, 0, null, new CompletionHandler<>() { // 從位置 0 開始讀取數據,數據讀取到緩衝區 buff 中
long readSize = 0; // 已經讀取的位元組數
@Override
public void completed(Integer result, Object attachment) {
// 打印讀取到的數據
System.out.println(Thread.currentThread() + new String(buff.array(), 0, result));
try {
if ( (readSize = readSize + result) < readChan.size()) { // 已讀取位元組數少於文件總位元組數,繼續讀取
buff.clear(); // 將 buff 的 position 移動到起始位置,使其變為可寫狀態
readChan.read(buff, readSize, null, this); // 遞歸,繼續讀取,注意改變讀取位置,Handler 直接使用 this。
} else {
semaphore.release();
}
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
// 主線程等待文件數據讀取結束。
semaphore.acquire();
}
}
5. 異步套接字通道與異步套接字監聽通道
面向通道的 Socket 通信需要有客戶端和服務端的參與,涉及到監聽套接字連接的通道和收發數據的套接字通道。服務端通過套接字監聽通道接收客戶端的連接,併產生一個套接字通道與客戶端通信,而客戶端需要主動創建套接字通道去連接服務端。在 Java 實現的同步阻塞 I/O 和多路復用 I/O 中,套接字通道和套接字監聽通道分別是 SocketChannel 和 ServerSocketChannel,而 AIO 中的通道則分別是 AsynchronousSocketChannel 和 AsynchronousServerSocketChannel。
5.1 TCP 服務端 —— 異步套接字監聽通道
異步套接字監聽通道 AsynchronousServerSocketChannel 的異步操作是異步監聽連接,調用 accept 方法之後會立即返回,異步操作的結果是一個異步套接字通道 AsynchronousSocketChannel;連接建立成功之後,服務端即可與客戶端進行通信。
異步套接字監聽通道一次性只能夠接受一個連接,一個連接接受成功之後再接收下一個,連續接收連接會拋出 AcceptPendingException。例如,下面兩段代碼將會拋出異常。
serverSocketChannel.accept(null, handler); // 附件為空,傳入一個 CompletionHandler 實現類的對象。
serverSocketChannel.accept(null, handler);
或者
future = serverSocketChannel.accept();
future = serverSocketChannel.accept();
正確的使用方式是
serverSocketChannel.accept(null, new CompletionHandler<>() { // 異步建立連接
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) { // 成功建立連接
serverSocketChannel.accept(null, this); // 接收下一個
...... 其它邏輯
}
}
或者
future = serverSocketChannel.accept();
future.get(); // 阻塞,等待前一個連接完成
future = serverSocketChannel.accept();
下面是一個 TCP 服務端異步套接字監聽通道的一段完整示例代碼。服務端接收來自於客戶端的連接,連接成功之後繼續等待下一個;然後以異步的方式接收客戶端發來的數據並打印出來。這裡可能會有一個疑問,「接收下一個連接」 處對 accept 方法的調用算遞歸嗎?長時間運行會不會造成 Stack Overflow?嚴格來講,這不算是遞歸,也不會造成棧溢出錯誤,因為外層的 accept 方法會立即返回,釋放虛擬機棧的空間,棧的深度不會超過虛擬機允許的最大深度。
public class AsyncServerSocketChannel {
public static void main(String[] args) throws IOException, InterruptedException {
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 9090));
serverSocketChannel.accept(null, new CompletionHandler<>() { // 異步建立連接
@Override
public void completed(AsynchronousSocketChannel socketChannel, Object attachment) { // 成功建立連接
serverSocketChannel.accept(null, this); // 接收下一個連接
ByteBuffer buf = ByteBuffer.allocate(8); // 分配一個 8 位元組的緩衝區
socketChannel.read(buf, null, new CompletionHandler<>() { // 異步讀取數據
@Override
public void completed(Integer len, Object attachment) { // 成功讀取到數據
if (-1 != len) { // 客戶端未關閉通道
System.out.print(new String(buf.array(), 0, len));
buf.clear(); // 清除緩衝區,為下一次寫入數據做準備
socketChannel.read(buf, null, this); // 繼續讀取下一批數據
} else {
try {
socketChannel.close(); // 關閉通道
} catch (IOException e) {
e.printStackTrace();
}
System.out.println();
}
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
new Semaphore(0).acquire(); // 阻塞主線程
}
}
5.2 TCP 客戶端 —— 異步套接字通道
在上面的異步套接字監聽通道的例子中其實已經包含了異步套接字通道讀取數據的方式,下面給出的例子是往異步套接字通道寫入數據(即向 TCP 服務端發送數據)的例子。
回調操作方式。
public class AsyncSocketChannel {
public static void main(String[] args) throws IOException, InterruptedException {
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(); // 打開一個異步的 Socket 通道
InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 9090); // 服務端地址
Semaphore semaphore = new Semaphore(0); // 定義一個信號量,用來確保主線程等待 Socket 將數據完成再退出
socketChannel.connect(serverAddress, null, new CompletionHandler<>() {
@Override
public void completed(Void result, Object attachment) { // 成功建立連接之後觸發
String msg = "Hello, this is a TCP Client.";
ByteBuffer data = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)); // 將要發送的數據放到緩衝區中
socketChannel.write(data, null, new CompletionHandler<>() { // 往通道中寫(發)數據給服務端
@Override
public void completed(Integer result, Object attachment) { // 成功寫完一批數據後觸發
if (data.hasRemaining()) { // 緩衝區還有數據
socketChannel.write(data, null, this); // 繼續(寫)發給服務端
} else { // 緩衝區數據已經全部發送給了客戶端
try {
socketChannel.shutdownOutput(); // 關閉輸出,服務端調用 read 時收到返回值 -1
socketChannel.close(); // 關閉通道
semaphore.release(); // 釋放信號量許可,讓主線程可以繼續往下走
} catch (IOException e) {
e.printStackTrace();
}
}
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
}
@Override
public void failed(Throwable exc, Object attachment) {
exc.printStackTrace();
}
});
semaphore.acquire(); // 等到異步線程工作完成
}
}
Future 操作方式。
/**
* 異步 Socket,返回 Future。
*/
class AsyncSocketChannel2 {
public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
InetSocketAddress serverAddress = new InetSocketAddress("127.0.0.1", 9090);
Future<Void> connect = socketChannel.connect(serverAddress); // 連接到服務端
connect.get(); // 阻塞,等待連接建立成功
byte[] data = "AsyncSocketChannel with Future.".getBytes(StandardCharsets.UTF_8);
ByteBuffer buf = ByteBuffer.allocate(4);
for (int i = 0; i < data.length; i += buf.capacity()) {
buf.put(data, i, Math.min(buf.capacity(), data.length - i));
buf.flip(); // 使緩衝區變為可讀狀態
while (buf.hasRemaining()) { // 緩衝區中還有數據(緩衝區的數據不一定能夠一次性就被發送出去)
Future<Integer> future = socketChannel.write(buf); // 非阻塞發送數據
future.get(); // 阻塞等待數據發送成功
}
buf.clear(); // 清空緩衝區,變為可寫狀態
}
socketChannel.shutdownOutput();
socketChannel.close();
}
}
6. 小結
Java AIO 的操作模式和一般的異步代碼編寫模式類似,都支持返回 Future 的操作和回調操作;但這並不是 AIO 的核心,基於其它 I/O 模型(如:同步阻塞I/O模型)也可以提供類似的異步操作 API。AIO 的厲害之處在於它調用了操作系統內核提供的異步 I/O 接口,提高了 I/O 的效率。
無論是訪問文件還是網絡,AIO 的操作步驟和一般基於通道的 I/O 操作步驟類似,包括打開通道,關閉通道,接收連接,讀取(接收)數據,寫入(發送)數據。這些步驟當中,讀/寫數據以及接收連接是異步的,其它步驟都是同步。這一點與一些 API 全盤異步的框架(如 Vert.X)不同。
7. 參考
[1] I/O Multiplexing
[2] Java NIO 緩衝區 Buffer
[3] Java NIO 文件通道 FileChannel 用法
[4] Java NIO 通道 Channel