盤一盤 NIO (二)—— Channel解析
- 2019 年 10 月 3 日
- 筆記
Channel是個啥?
Channel,顧名思義,它就是一個通道。NIO中的所有IO都是從 Channel 開始的。
Channel通道和流非常類似,主要有以下幾點區別:
1、流是單向的,通道是雙向的,可讀可寫。
2、流讀寫是阻塞的,通道可以非同步讀寫。
3、流中的數據可以選擇性的先讀到快取中,通道的數據總是要先讀到一個快取Buffer中,或從快取Buffer中寫入。
繼承關係圖
Channel有兩種分類方式。一種是按同步Channel和非同步Channel劃分,還有一種是按功能劃分。
後面我們會主要講解Channel的幾個重要實現,如下所示:
FileChannel: 從文件中讀寫數據
DatagramChannel: 通過UDP讀寫網路中的
SocketChannel: 通過TCP讀寫網路中的,一般是客戶端實現
ServerSocketChannel: 允許我們監聽TCP鏈接請求,每個請求會創建會一個SocketChannel,一般是伺服器實現

介面方法
public interface Channel extends Closeable { // 判斷Channel的開關狀態 public boolean isOpen(); // 關閉此Channel public void close() throws IOException; }
FileChanel
FileChannel是一個連接到文件的通道,可以通過文件通道讀寫文件。眾所周知文件通道總是阻塞式的,因此FileChannel無法設置為非阻塞模式。
FileChannel中重要方法,read、write通過其實現類FileChannelImpl實現
// 從這個通道讀入一個位元組序列到給定的緩衝區 public abstract int read(ByteBuffer dst) throws IOException; // 從這個通道讀入指定開始位置和長度的位元組序列到給定的緩衝區 public abstract long read(ByteBuffer[] dsts, int offset, int length) throws IOException; // 從這個通道讀入一個位元組序列到給定的緩衝區 public final long read(ByteBuffer[] dsts) throws IOException { return read(dsts, 0, dsts.length); } // 從給定的緩衝區寫入位元組序列到這個通道 public abstract int write(ByteBuffer src) throws IOException; // 從給定緩衝區的子序列向該信道寫入位元組序列 public abstract long write(ByteBuffer[] srcs, int offset, int length) throws IOException; // 從給定的緩衝區寫入位元組序列到這個通道 public final long write(ByteBuffer[] srcs) throws IOException { return write(srcs, 0, srcs.length); }
FileChannelImpl中read、write方法實現
// read 方法實現 public int read(ByteBuffer var1) throws IOException { this.ensureOpen(); if (!this.readable) { throw new NonReadableChannelException(); } else { synchronized(this.positionLock) { int var3 = 0; int var4 = -1; try { this.begin(); var4 = this.threads.add(); if (!this.isOpen()) { byte var12 = 0; return var12; } else { do { var3 = IOUtil.read(this.fd, var1, -1L, this.nd); } while(var3 == -3 && this.isOpen()); int var5 = IOStatus.normalize(var3); return var5; } } finally { this.threads.remove(var4); this.end(var3 > 0); assert IOStatus.check(var3); } } } } // write方法實現 public int write(ByteBuffer var1) throws IOException { this.ensureOpen(); if (!this.writable) { throw new NonWritableChannelException(); } else { synchronized(this.positionLock) { int var3 = 0; int var4 = -1; byte var5; try { this.begin(); var4 = this.threads.add(); if (this.isOpen()) { do { var3 = IOUtil.write(this.fd, var1, -1L, this.nd); } while(var3 == -3 && this.isOpen()); int var12 = IOStatus.normalize(var3); return var12; } var5 = 0; } finally { this.threads.remove(var4); this.end(var3 > 0); assert IOStatus.check(var3); } return var5; } } }
IOUtil中read、write實現:
// read方法實現 static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException { if (var1.isReadOnly()) { throw new IllegalArgumentException("Read-only buffer"); } else if (var1 instanceof DirectBuffer) { return readIntoNativeBuffer(var0, var1, var2, var4); } else { // 申請一塊和快取同大小的ByteBuffer var5 ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining()); int var7; try { // 讀取數據到快取,底層由NativeDispatcher的read實現。 int var6 = readIntoNativeBuffer(var0, var5, var2, var4); var5.flip(); if (var6 > 0) { // 把數據讀取到var1(用戶定義的快取,在jvm中分配記憶體) var1.put(var5); } var7 = var6; } finally { Util.offerFirstTemporaryDirectBuffer(var5); } return var7; } } // write 方法實現 static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException { if (var1 instanceof DirectBuffer) { return writeFromNativeBuffer(var0, var1, var2, var4); } else { int var5 = var1.position(); int var6 = var1.limit(); assert var5 <= var6; int var7 = var5 <= var6 ? var6 - var5 : 0; // 申請一塊ByteBuffer,大小為byteBuffer中的limit - position ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7); int var10; try { // 複製byteBuffer中的數據 var8.put(var1); var8.flip(); var1.position(var5); // 把數據寫入到文件,底層由NativeDispatcher的write實現 int var9 = writeFromNativeBuffer(var0, var8, var2, var4); if (var9 > 0) { var1.position(var5 + var9); } var10 = var9; } finally { Util.offerFirstTemporaryDirectBuffer(var8); } return var10; } }
小結
1、文件通道不能直接創建,只能通過InputStream、OutputStream或RandomAccessFile等創建對應的文件通道
2、文件通道FileChannel從緩衝區中讀取數據,使用read方法
3、文件通道FileChannel的read方法只能讀ByteBuffer緩衝區
DatagramChannel
DatagramChannel,使用UDP協議來進行傳輸。由於不需要建立連接,其實沒有客戶端服務端的概念,為了便於理解,我們定義其中一端為客戶端,一端為服務端
客戶端
public static void main(String[] args) throws Exception { // 打開DatagramChannel DatagramChannel datagramChannel = DatagramChannel.open(); // 綁定一個埠發送數據 ByteBuffer byteBuffer = ByteBuffer.wrap("A".getBytes()); int byteSent = datagramChannel.send(byteBuffer, new InetSocketAddress("127.0.0.1", 8000)); System.out.println("Byte sent is: " + byteSent); }
服務端
public static void main(String[] args) throws Exception { // 打開DatagramChannel,綁定一個埠 DatagramChannel datagramChannel = DatagramChannel.open(); datagramChannel.socket().bind(new InetSocketAddress(8000)); while (true) { // 接收數據並輸出 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); datagramChannel.receive(byteBuffer); byteBuffer.flip(); if(byteBuffer.hasRemaining()) { System.out.print((char) byteBuffer.get()); } } }
ServerSocketChannel和SocketChannel
ServerSocketChannel是一個可以監聽新進來的TCP連接的通道。ServerSocketChannel本身不具備傳輸數據的能力,而只是負責監聽傳入的連接和創建新的SocketChannel。
SocketChannel是一個連接到TCP網路套接字的通道。通常SocketChannel在客戶端向伺服器發起連接請求,每個SocketChannel對象創建時都關聯一個對等的Socket對象。同樣SocketChannel也可以運行在非阻塞模式下。
可以通過以下2種方式創建SocketChannel:
1、打開一個SocketChannel並連接到互聯網上的某台伺服器
2、一個新連接到達ServerSocketChannel時,會創建一個SocketChannel
服務端
public static void main(String[] args) throws Exception { // 服務端首先打開ServerSocketChannel,然後綁定一個埠 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress(8000)); // 服務端ServerSocketChannel收到連接請求時,返回一個SocketChannel對象 SocketChannel socketChannel = serverSocketChannel.accept(); while(true) { // 把數據從channel中讀出來,然後寫入到buffer中然後列印 ByteBuffer buffer = ByteBuffer.allocate(128); socketChannel.read(buffer); buffer.flip(); if(buffer.hasRemaining()) { System.out.println((char) buffer.get()); } } }
客戶端
public static void main(String[] args) throws Exception { // 客戶端建立連接的過程,首先打開SocketChannel,然後連接到服務端 SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000)); //連接是否建立成功 boolean isConnect = socketChannel.isConnected(); while (true) { // 通過buffer,向channel中寫入數據 ByteBuffer buffer = ByteBuffer.allocate(128); buffer.clear(); buffer.put(("A").getBytes()); buffer.flip(); socketChannel.write(buffer); Thread.sleep(1000); } }