盤一盤 NIO (二)—— Channel解析

  • 2019 年 10 月 3 日
  • 筆記

Channel是個啥?

Channel,顧名思義,它就是一個通道。NIO中的所有IO都是從 Channel 開始的。
Channel通道和流非常類似,主要有以下幾點區別:
1、流是單向的,通道是雙向的,可讀可寫。
2、流讀寫是阻塞的,通道可以非同步讀寫。
3、流中的數據可以選擇性的先讀到快取中,通道的數據總是要先讀到一個快取Buffer中,或從快取Buffer中寫入。
 

繼承關係圖

Channel有兩種分類方式。一種是按同步Channel和非同步Channel劃分,還有一種是按功能劃分。
後面我們會主要講解Channel的幾個重要實現,如下所示:
FileChannel: 從文件中讀寫數據
DatagramChannel: 通過UDP讀寫網路中的
SocketChannel: 通過TCP讀寫網路中的,一般是客戶端實現
ServerSocketChannel: 允許我們監聽TCP鏈接請求,每個請求會創建會一個SocketChannel,一般是伺服器實現
 
 

介面方法

public interface Channel extends Closeable {        // 判斷Channel的開關狀態      public boolean isOpen();        // 關閉此Channel      public void close() throws IOException;    }

 

FileChanel

FileChannel是一個連接到文件的通道,可以通過文件通道讀寫文件。眾所周知文件通道總是阻塞式的,因此FileChannel無法設置為非阻塞模式。
FileChannel中重要方法,read、write通過其實現類FileChannelImpl實現
    // 從這個通道讀入一個位元組序列到給定的緩衝區      public abstract int read(ByteBuffer dst) throws IOException;        // 從這個通道讀入指定開始位置和長度的位元組序列到給定的緩衝區      public abstract long read(ByteBuffer[] dsts, int offset, int length)          throws IOException;        // 從這個通道讀入一個位元組序列到給定的緩衝區      public final long read(ByteBuffer[] dsts) throws IOException {          return read(dsts, 0, dsts.length);      }          // 從給定的緩衝區寫入位元組序列到這個通道      public abstract int write(ByteBuffer src) throws IOException;           // 從給定緩衝區的子序列向該信道寫入位元組序列      public abstract long write(ByteBuffer[] srcs, int offset, int length)          throws IOException;           // 從給定的緩衝區寫入位元組序列到這個通道      public final long write(ByteBuffer[] srcs) throws IOException {          return write(srcs, 0, srcs.length);      }

FileChannelImpl中read、write方法實現

// read 方法實現  public int read(ByteBuffer var1) throws IOException {      this.ensureOpen();      if (!this.readable) {          throw new NonReadableChannelException();      } else {          synchronized(this.positionLock) {              int var3 = 0;              int var4 = -1;                try {                  this.begin();                  var4 = this.threads.add();                  if (!this.isOpen()) {                      byte var12 = 0;                      return var12;                  } else {                      do {                          var3 = IOUtil.read(this.fd, var1, -1L, this.nd);                      } while(var3 == -3 && this.isOpen());                        int var5 = IOStatus.normalize(var3);                      return var5;                  }              } finally {                  this.threads.remove(var4);                  this.end(var3 > 0);                    assert IOStatus.check(var3);              }          }      }  }    // write方法實現  public int write(ByteBuffer var1) throws IOException {      this.ensureOpen();      if (!this.writable) {          throw new NonWritableChannelException();      } else {          synchronized(this.positionLock) {              int var3 = 0;              int var4 = -1;                byte var5;              try {                  this.begin();                  var4 = this.threads.add();                  if (this.isOpen()) {                      do {                          var3 = IOUtil.write(this.fd, var1, -1L, this.nd);                      } while(var3 == -3 && this.isOpen());                          int var12 = IOStatus.normalize(var3);                      return var12;                  }                  var5 = 0;              } finally {                  this.threads.remove(var4);                  this.end(var3 > 0);                    assert IOStatus.check(var3);              }              return var5;          }      }  }

IOUtil中read、write實現:

//  read方法實現  static int read(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {      if (var1.isReadOnly()) {          throw new IllegalArgumentException("Read-only buffer");      } else if (var1 instanceof DirectBuffer) {          return readIntoNativeBuffer(var0, var1, var2, var4);      } else {          // 申請一塊和快取同大小的ByteBuffer var5          ByteBuffer var5 = Util.getTemporaryDirectBuffer(var1.remaining());            int var7;          try {              // 讀取數據到快取,底層由NativeDispatcher的read實現。              int var6 = readIntoNativeBuffer(var0, var5, var2, var4);              var5.flip();              if (var6 > 0) {                  // 把數據讀取到var1(用戶定義的快取,在jvm中分配記憶體)                  var1.put(var5);              }                var7 = var6;          } finally {              Util.offerFirstTemporaryDirectBuffer(var5);          }            return var7;      }  }    // write 方法實現  static int write(FileDescriptor var0, ByteBuffer var1, long var2, NativeDispatcher var4) throws IOException {      if (var1 instanceof DirectBuffer) {          return writeFromNativeBuffer(var0, var1, var2, var4);      } else {          int var5 = var1.position();          int var6 = var1.limit();          assert var5 <= var6;            int var7 = var5 <= var6 ? var6 - var5 : 0;          // 申請一塊ByteBuffer,大小為byteBuffer中的limit - position          ByteBuffer var8 = Util.getTemporaryDirectBuffer(var7);            int var10;          try {              // 複製byteBuffer中的數據              var8.put(var1);              var8.flip();              var1.position(var5);                // 把數據寫入到文件,底層由NativeDispatcher的write實現              int var9 = writeFromNativeBuffer(var0, var8, var2, var4);              if (var9 > 0) {                  var1.position(var5 + var9);              }              var10 = var9;          } finally {              Util.offerFirstTemporaryDirectBuffer(var8);          }          return var10;      }  }

 

小結

1、文件通道不能直接創建,只能通過InputStream、OutputStream或RandomAccessFile等創建對應的文件通道
2、文件通道FileChannel從緩衝區中讀取數據,使用read方法
3、文件通道FileChannel的read方法只能讀ByteBuffer緩衝區
 

DatagramChannel

DatagramChannel,使用UDP協議來進行傳輸。由於不需要建立連接,其實沒有客戶端服務端的概念,為了便於理解,我們定義其中一端為客戶端,一端為服務端

客戶端

public static void main(String[] args) throws Exception {      // 打開DatagramChannel      DatagramChannel datagramChannel = DatagramChannel.open();      // 綁定一個埠發送數據      ByteBuffer byteBuffer = ByteBuffer.wrap("A".getBytes());      int byteSent = datagramChannel.send(byteBuffer, new InetSocketAddress("127.0.0.1", 8000));      System.out.println("Byte sent is: " + byteSent);  }

服務端

public static void main(String[] args) throws Exception {      // 打開DatagramChannel,綁定一個埠      DatagramChannel datagramChannel = DatagramChannel.open();      datagramChannel.socket().bind(new InetSocketAddress(8000));        while (true) {          // 接收數據並輸出          ByteBuffer byteBuffer = ByteBuffer.allocate(1024);          datagramChannel.receive(byteBuffer);          byteBuffer.flip();          if(byteBuffer.hasRemaining()) {              System.out.print((char) byteBuffer.get());          }      }  }

 

ServerSocketChannel和SocketChannel

ServerSocketChannel是一個可以監聽新進來的TCP連接的通道。ServerSocketChannel本身不具備傳輸數據的能力,而只是負責監聽傳入的連接和創建新的SocketChannel。
SocketChannel是一個連接到TCP網路套接字的通道。通常SocketChannel在客戶端向伺服器發起連接請求,每個SocketChannel對象創建時都關聯一個對等的Socket對象。同樣SocketChannel也可以運行在非阻塞模式下。
可以通過以下2種方式創建SocketChannel:
1、打開一個SocketChannel並連接到互聯網上的某台伺服器
2、一個新連接到達ServerSocketChannel時,會創建一個SocketChannel
 

服務端

public static void main(String[] args) throws Exception {      // 服務端首先打開ServerSocketChannel,然後綁定一個埠      ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();      serverSocketChannel.socket().bind(new InetSocketAddress(8000));        // 服務端ServerSocketChannel收到連接請求時,返回一個SocketChannel對象      SocketChannel socketChannel = serverSocketChannel.accept();      while(true) {          // 把數據從channel中讀出來,然後寫入到buffer中然後列印          ByteBuffer buffer = ByteBuffer.allocate(128);          socketChannel.read(buffer);          buffer.flip();          if(buffer.hasRemaining()) {              System.out.println((char) buffer.get());          }      }  }

客戶端

public static void main(String[] args) throws Exception {      // 客戶端建立連接的過程,首先打開SocketChannel,然後連接到服務端      SocketChannel socketChannel = SocketChannel.open();      socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));        //連接是否建立成功       boolean isConnect = socketChannel.isConnected();        while (true) {          // 通過buffer,向channel中寫入數據          ByteBuffer buffer = ByteBuffer.allocate(128);          buffer.clear();          buffer.put(("A").getBytes());          buffer.flip();          socketChannel.write(buffer);          Thread.sleep(1000);      }  }