NIO(三):Selector選擇器
2020 年 8 月 5 日
筆記
NIO , Nio-Netty
一.堵塞式與非堵塞式
在傳統IO中,將數據由當前線程從客戶端傳入服務端,由服務端的內核進行判斷傳過來的數據是否合法,內核中是否存在數據。
如果不存在數據 ,並且數據並不合法,當前線程將會堵塞等待。當前線程將無法進行下一步傳輸,進行排隊現象。降低系統性能。
為了解決這一步問題,調用資源開闢多個線程傳輸。
雖然線程的開闢解決了部分堵塞排隊的問題,但由於並沒有治理根本堵塞的原因,線程數量也是有限的。總會有堵塞的線程 ,形成排隊現象。
為了根本解決堵塞的問題。NIO的非堵塞式成為了主要的傳輸方式。
在客戶端和服務端之間將通道註冊到selector選擇器,由選擇器進行監聽channel是否進行什麼操作(read()or write())。
當數據就緒或者準備完成時,由selector進行分配到服務端的一個(或多個)線程上進行相關運行操作。
在IO的堵塞後無腦調用線程下。NIO是在準備完成時,才被selector選擇分配到一個或者多個線程上傳輸並被複制到內核地址空間中,由於數據已準備完成或者已就緒,內核就無須被堵塞。
二.Selector(選擇器)
也稱多路復用器,多條channel復用selector。channe通過註冊到selector ,使selector對channel進行監聽,
實現儘可能少的線程管理多個連接。減少了 線程的使用,降低了因為線程的切換引起的不必要額資源浪費和多餘的開銷。
也是網絡傳輸非堵塞的核心組件。
三.Selector的使用
分為客戶端和服務端兩部分:
先實現客戶端吧:
流程: 獲取通道綁定主機端口 –> 切換非堵塞狀態 –> 開闢buffer容量 –> 將當前時間作為數據寫入buffer待傳 –> 切換讀寫方式flip() –> 寫入通道 –>清空並關閉
1 /*
2 * 客戶端發送數據 通過channel通道
3 * */
4 @Test
5 public void Client() throws IOException {
6
7 // 獲取channel通道 並設置主機號和端口號
8 SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8080));
9
10 // 因為使用非阻塞NIO 所以必須切換為非阻塞
11 socketChannel.configureBlocking(false ); //默認為true 需要改為非堵塞的
12
13 // 開闢緩衝區進行存儲數據
14 ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
15
16 // 準備工作就緒後,準備發送數據給服務端
17 // 打印當前日期轉為Byte數據傳出
18 byteBuffer.put(new Date().toString().getBytes());
19 // 切換讀寫模式
20 byteBuffer.flip();
21 // 寫入通道
22 socketChannel.write(byteBuffer);
23 // 完畢時,清除緩衝區內容
24 byteBuffer.clear();
25
26 // ====================
27 // 關閉相關流
28 socketChannel.close();
29
30 }
在獲取當前時間是用的new Date();還可以使用java8的獲取時間的方法。
LocalDateTime.now().toString().getBytes() //轉為Byte位元組
因為是網絡傳輸的心形式,所以在獲取channel時,使用SocketChannel.open方法。實現方法:
1 public static SocketChannel open(SocketAddress remote)
2 throws IOException
3 {
4 SocketChannel sc = open();
5 try {
6 sc.connect(remote); // 打開一個新的channel時,綁定連接到主機和端口上
7 } catch (Throwable x) {
8 try {
9 sc.close(); // 異常時關閉連接
10 } catch (Throwable suppressed) {
11 x.addSuppressed(suppressed);
12 }
13 throw x;
14 }
15 assert sc.isConnected();
16 return sc;
17 }
new InetSocketAddress實例創建主機和端口。
*/
public InetSocketAddress(String hostname, int port) {
checkHost(hostname); // 檢查主機號是否為空 為空返回異常。
InetAddress addr = null ;
String host = null ;
try {
addr = InetAddress.getByName(hostname);
} catch (UnknownHostException e) {
host = hostname;
}
holder = new InetSocketAddressHolder(host, addr, checkPort(port)); //檢查端口。
} //檢查端口方法
private static int checkPort(int port) { if (port < 0 || port > 0xFFFF) throw new IllegalArgumentException("port out of range:" + port); return port; } //檢查主機號方法 private static String checkHost(String hostname) { if (hostname == null) throw new IllegalArgumentException("hostname can't be null"); return hostname; }
服務端:
流程:使用ServerSocketChannel 的方法獲取服務端額channel –> 切換為堵塞狀態 –> 為buffer分配容量 –> 綁定端口號 –> 獲取selector選擇器 –> channel註冊進選擇器中,並進行監聽 –> 選擇器進行輪詢,進行下一步讀寫操作。
1 /*
2 * 服務端接收客戶端傳來的數據
3 * */
4 @Test
5 public void server() throws IOException {
6
7 // 獲取channel通道
8 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
9 // 切換為非堵塞狀態
10 serverSocketChannel.configureBlocking(false );
11 // 分配服務端的緩衝區
12 ByteBuffer serverByteBuffer = ByteBuffer.allocate(1024);
13 // 將客戶端的InetSocketAddress綁定到通道,不綁定 不統一將獲取不到數據
14 serverSocketChannel.bind(new InetSocketAddress(8080));
15 // 獲取選擇器
16 Selector selector = Selector.open();
17 // 將通道註冊到選擇器中,並且制定監聽方式
18 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
19 // 進行輪詢選擇器上就緒成功的事件 當存在就緒成功的及進行下一步
20 while (selector.select() > 0){
21 // 對已存在的就緒事件進行迭代
22 Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
23
24 // 有元素就進行下一步
25 while (selectionKeyIterator.hasNext()){
26 // 獲取到就緒事件
27 SelectionKey next = selectionKeyIterator.next();
28
29 // 對獲取到的就緒事件判斷是何種類型
30 if (next.isAcceptable()){
31
32 // 獲取連接
33 SocketChannel accept = serverSocketChannel.accept();
34
35 // 將獲取到的連接切換為非堵塞模式
36 accept.configureBlocking(false );
37
38 // 將獲取到的鏈接 註冊金selector
39 accept.register(selector,SelectionKey.OP_READ);
40
41 // 判斷是否準備好讀
42 }else if (next.isReadable()){
43
44 // 獲取已就緒的通道
45 SocketChannel channel = (SocketChannel) next.channel();
46
47 // 分配緩衝區
48 ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
49
50 // 讀取數據
51 int length = 0 ;
52 while ((length = channel.read(byteBuffer)) > 0){
53 byteBuffer.flip();
54 System.out.println(new String(byteBuffer.array(),0,length));
55 byteBuffer.clear();
56 }
57
58
59 }
60
61 // 完成傳輸需要取消選擇鍵,防止下次出問題
62 selectionKeyIterator.remove();
63
64 }
65 }
66
67
68 }
如何獲取選擇器?
Selector selector = Selector.open();
實現過程:
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
//首先進入此方法判斷是否存在選擇器
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null ) //第一次為false
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
} //false時 跳入如下方法。
public static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel(); }
隨後將獲取到的通道註冊到獲取到的選擇器中,在註冊時給定監聽方式:
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //可多選監聽操作項
selectionKey中定義了四個可操作項:
OP_READ 可讀就緒
OP_WRITE 可寫就緒
OP_CONNECT 連接就緒
OP_ACCEPT 接收就緒
迭代key中已就緒的元素。
Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
獲取到當前就緒事件叢迭代器中獲取。
selectionKeyIterator.next()
selectionKey包含四個方法:
isReadable():測試此選擇鍵是否可讀
isWritable():測試此選擇鍵是否可寫
isConnectable():測試此選擇鍵是否完成
isAcceptable():測試此選擇鍵是否可以接受一個新的連接
通過這些相應的方法,單獨判斷是否可以讀寫,和進行操作。
最後取消選擇鍵,防止下次獲取出現異常情況。(第一次判斷可能會為true)
selectionKeyIterator.remove();
四.附加
在上面的例子中,把客戶端的代碼進行稍微改寫一下,使之能夠無限輸入,並通過傳輸打印在服務端中。
public static void main(String[] args) throws IOException {
// 獲取channel通道 並設置主機號和端口號
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8080));
// 因為使用非阻塞NIO 所以必須切換為非阻塞
socketChannel.configureBlocking(false );
// 開闢緩衝區進行存儲數據
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 附加輸入:
Scanner scanner = new Scanner(System.in);
//通過控制台鍵入數據
while (scanner.hasNext()){
String str = scanner.next();
//準備工作就緒後,準備發送數據給服務端
//打印當前日期轉為Byte數據傳出
byteBuffer.put((new Date().toString()+":--->"+str).getBytes());
//切換讀寫模式
byteBuffer.flip();
//寫入通道
socketChannel.write(byteBuffer);
//完畢時,清除緩衝區內容
byteBuffer.clear();
}
}
由於掃描流(scanner)不能用於測試類,所以在main方法下進行測試:
每次輸入的內容都會被轉為Byte位元組進行傳輸。
客戶端輸入結果:
服務端輸出結果:
每輸入一次便傳輸一次。
// 完成傳輸需要取消選擇鍵,防止下次出問題 selectionKeyIterator .remove();