深入學習Netty(3)——傳統AIO編程
前言
之前已經整理過了BIO、NIO兩種I/O的相關博文,每一種I/O都有其特點,但相對開發而言,肯定是要又高效又簡單的I/O編程才是真正需要的,在之前的NIO博文(深入學習Netty(2)——傳統NIO編程)中就已經介紹過NIO編程的缺點(相比較而言的缺點:同步非阻塞,需要單獨開啟線程不斷輪詢),所以才會有真正的異步非阻塞I/O出現,這就是此篇博文需要介紹的AIO編程。
參考資料《Netty In Action》、《Netty權威指南》(有需要的小夥伴可以評論或者私信我)
博文中所有的代碼都已上傳到Github,歡迎Star、Fork
感興趣可以先學習相關博文:
一、NIO 2.0與AIO編程
JDK 1.7升級了NIO類庫,升級後的NIO類庫稱之為NIO 2.0,Java提供了異步文件I/O操作,同時提供了與UNIX網絡編程事件驅動I/O對應的AIO。
NIO 2.0的異步套接字通道是真正的異步非阻塞I/O,對應有UNIX網絡編程中的事件驅動I/O(AIO),相比較NIO,它不需要通過Selector對註冊的通道進行輪詢操作即可實現異步讀寫,簡化了NIO的編程模型。
NIO 2.0提供了新的異步通道的概念,異步通道提供了以下兩種方式獲取操作結果:
- 通過juc.Futrue類來表示異步操作的結果。
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(); InetSocketAddress inetSocketAddress = new InetSocketAddress("localhost", 8080); Future<Void> connect = socketChannel.connect(inetSocketAddress); while (!connect.isDone()) {
Thread.sleep(10); }
- 在異步操作的時候傳入java.nio.channels。實現CompletionHandler接口complete()的方法作為操作完成回調。
private class MyCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { @Override public void completed(Integer result, ByteBuffer attachment) { // TODO 回調後業務操作 } @Override public void failed(Throwable t, ByteBuffer attachment) { t.printStackTrace(); }
二、AIO服務端
(1)服務端AIO異步處理任務AsyncTimeServerHandler:
- 創建異步服務通道並監聽端口
- 異步監聽客戶端連接
/** * 服務端AIO異步處理任務 * -創建異步服務通道監聽端口 * -監聽客戶端連接 */ public class AsyncTimeServerHandler implements Runnable{ private int port; CountDownLatch latch; AsynchronousServerSocketChannel asynchronousServerSocketChannel; public AsyncTimeServerHandler(int port) { this.port = port; try { // 創建異步的服務通道asynchronousServerSocketChannel, 並bind監聽端口 asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open(); asynchronousServerSocketChannel.bind(new InetSocketAddress(port)); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { // countDownLatch沒有count減一,所以導致一直阻塞 latch = new CountDownLatch(1); doAccept(); try { // 防止執行操作線程還未結束,服務端線程就退出,程序不退出的前提下,才能夠讓accept繼續可以回調接受來自客戶端的連接 // 實際開發過程中不需要單獨開啟線程去處理AsynchronousServerSocketChannel latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } /** * 接收客戶端的連接 * 參數CompletionHandler類型的handler實例來接收accept操作成功的通知消息 */ public void doAccept() { asynchronousServerSocketChannel.accept(this, new AcceptCompletionHandler()); } }
(2)服務端連接異步回調處理器AcceptCompletionHandler:異步處理客戶端連接完成後的操作
/** * 客戶端連接異步處理器 * completed()方法完成回調logic * failed()方法完成失敗回調logic */ public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncTimeServerHandler> { /** * 調用該方法表示客戶端已經介接入成功 * 同時再accept接收新的客戶端連接 * @param result * @param attachment */ @Override public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment) { // 此時還要繼續調用accept方法是因為,completed方法表示上一個客戶端連接完成,而下一個新的客戶端需要連接 // 如此形成新的循環:每接收一個客戶端的成功連接之後,再異步接收新的客戶端連接 attachment.asynchronousServerSocketChannel.accept(attachment, this); // 預分配1M的緩衝區 ByteBuffer buffer = ByteBuffer.allocate(1024); // 調用read方法異步讀,傳入CompletionHandler類型參數異步回調讀事件 result.read(buffer, buffer, new ReadCompletionHandler(result)); } @Override public void failed(Throwable exc, AsyncTimeServerHandler attachment) { exc.printStackTrace(); // 讓服務線程不再阻塞 attachment.latch.countDown(); } }
(3)服務端read事件異步回調處理器ReadCompletionHandler:異步回調處理客戶端請求數據
/** * 服務端read事件異步處理器 * completed異步回調處理客戶端請求數據 */ public class ReadCompletionHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; public ReadCompletionHandler(AsynchronousSocketChannel channel) { if (this.channel == null) { this.channel = channel; } } @Override public void completed(Integer result, ByteBuffer attachment) { attachment.flip(); // 根據緩衝區的可讀位元組創建byte數組 byte[] body = new byte[attachment.remaining()]; attachment.get(body); try { // 解析請求命令 String req = new String(body, "UTF-8"); System.out.println("The time server receive order : " + req); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(req) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; // 發送當前時間給客戶端 doWrite(currentTime); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } private void doWrite(String currentTime) { if (currentTime != null && currentTime.trim().length() > 0) { byte[] bytes = (currentTime).getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); // write異步回調,傳入CompletionHandler類型參數 channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { // 如果沒有發送完成,繼續發送 if (buffer.hasRemaining()) { channel.write(buffer, buffer, this); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { // TODO 只要是I/O異常就需要關閉鏈路,釋放資源 } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); // TODO 只要是I/O異常就需要關閉鏈路,釋放資源 } } }
(4)服務端啟動TimeServer
/** * AIO 異步非阻塞服務端 * 不需要單獨開線程去處理read、write等事件 * 只需要關注complete-handlers中的回調completed方法 */ public class TimeServer { public static void main(String[] args) throws IOException { int port = 8086; AsyncTimeServerHandler timeServer = new AsyncTimeServerHandler(port); new Thread(timeServer, "AIO-AsyncTimeServerHandler").start(); } }
(5)啟動服務端
服務端Console:
使用命令netstat查看8086端口是否監聽
三、AIO客戶端
(1)客戶端AIO異步回調處理任務:
- 打開AsynchronousSocketChannel通道,連接服務端
- 發送服務端指令
- 回調處理服務端應答
/** * 客戶端AIO異步回調處理任務 * -打開AsynchronousSocketChannel通道,連接服務端 * -發送服務端指令 * -回調處理服務端應答 */ public class AsyncTimeClientHandler implements CompletionHandler<Void, AsyncTimeClientHandler>, Runnable { private AsynchronousSocketChannel client; private String host; private int port; private CountDownLatch latch; public AsyncTimeClientHandler(String host, int port) { this.host = host; this.port = port; try { client = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { latch = new CountDownLatch(1); client.connect(new InetSocketAddress(host, port), this, this); try { // 防止異步操作都沒完成,連接線程就結束退出 latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { client.close(); } catch (IOException e) { e.printStackTrace(); } } /** * 發送請求完成異步回調 * @param result * @param attachment */ @Override public void completed(Void result, AsyncTimeClientHandler attachment) { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); client.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { client.write(buffer, buffer, this); } else { ByteBuffer readBuffer = ByteBuffer.allocate(1024); // 回調服務端應答消息 client.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); // 服務端應答完成後,連接線程退出 latch.countDown(); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); // 防止線程一直阻塞 latch.countDown(); } catch (IOException e) { // ingnore on close } } }); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { client.close(); latch.countDown(); } catch (IOException e) { // ingnore on close } } }); } @Override public void failed(Throwable exc, AsyncTimeClientHandler attachment) { exc.printStackTrace(); try { client.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } }
(2)客戶端TimeClient
/** * AIO 異步非阻塞 客戶端 * 不需要單獨開線程去處理read、write等事件 * 只需要關注complete-handlers中的回調completed方法 */ public class TimeClient { public static void main(String[] args) { int port = 8086; new Thread(new AsyncTimeClientHandler("127.0.0.1", port), "AIO-AsyncTimeClientHandler").start(); } }
(3)啟動客戶端
客戶端Console:
服務端Console:
四、總結
服務端通過countDownLatch一直阻塞
由代碼實踐我們可知:
JDK底層通過ThreadPoolExecutor執行回調通知,異步回調通知類由sun.nio.ch.AsynchronousChannelGroupImpl實現,然後將任務提交到該線程池以處理I/O事件,並分派給completion-handlers ,該隊列消耗對組中通道執行的異步操作的結果。
異步SocketChannel是被動執行,不需要單獨像NIO編程那樣單獨創建一個獨立的I/O線程處理讀寫操作,都是由JDK底層的線程池負責回調並驅動讀寫操作的。所以基於NIO 2.0的新的異步非阻塞相比較NIO編程要簡單,這兩區別在於:
- 在NIO中等待IO事件由我們註冊的selector來完成,在感興趣的事情來了,我們的線程來accept.read.write.connect…解析,解析完後再交由業務邏輯處理。
- 而在在異步IO(AIO、NIO 2.0)中等待IO事件同樣為accept,read,write,connect,但數據處理交由系統完成,我們需要做的就是在completionHandlers中處理業務邏輯回調即可。