Java-Netty前菜-NIO
NIO
NIO主要有三個核心部分組成:
- buffer緩衝區
- Channel管道
- Selector選擇器
在NIO中並不是以流的方式來處理數據的,而是以buffer緩衝區和Channel管道配合使用來處理數據。
NIO就是通過Channel管道運輸著存儲數據的Buffer緩衝區的來實現數據的處理!
Channel不與數據打交道,它只負責運輸數據。與數據打交道的是Buffer緩衝區
Selector
abstract class Selector implements Closeable
Selector open() // 創建當前執行緒的唯一一個選擇器
int select() // 返回當前選擇器的帶有事件的連接數量,阻塞操作
int select(1000) //返回當前選擇器的帶有事件的連接數量,阻塞1000毫秒
Selector wakeup(); // 喚醒selector
int selectNow() // 不阻塞,立刻返還
Set<SelectionKey> keys(); // 以set格式,返回當前selector所有連接
Set<SelectionKey> selectedKeys() // 以set格式,返回當前selector的所有 selectionKey
void close() // 釋放當前選擇器資源
ServerSocketChannel
在伺服器端監聽新的客戶端socket連接
sabstract class ServerSocketChannel
extends AbstractSelectableChannel
implements NetworkChannel
ServerSocketChannel open() // 新建監聽連接通道
ServerSocketChannel bind(SocketAddress local, int backlog) // 綁定ServerSocketChannel的地址
ServerSocket socket() // 獲取socket
SocketChannel accept() // 監聽並接收帶有請求連接事件的socketChannel通道連接,並返回該傳輸數據的通道socketChannel
SocketAddress getLocalAddress() //
SelectionKey register(Selector sel, int ops)// 綁定事件,告訴Selector當前ServerSocketChannel通道或SocketChannel通道關注的事件
boolean configureBlocking(boolean blok); // false表示採用非阻塞
SocketChannel
一個Selector可以註冊多個SocketChannel,網路IO通道,具體負責讀寫操作,把緩衝區數據寫入通道或把通道里的數據讀到緩衝區,讀和取是相對緩衝區而言
abstract class SocketChannel
extends AbstractSelectableChannel
implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel // 負責讀寫,因此多實現了三個位元組通道介面
SocketChannel open() // 新建傳輸數據的通道
SocketChannel bind(SocketAddress local) // 當前通道SocketChannel綁定ip
SelectionKey register() // 將socketChannel註冊到Selector上,並設置該連接的事件,相當於客戶端的socket,用於傳輸數據,返回SelectionKey
Socket socket(); // 獲取socket
int read(ByteBuffer dst) // 讀取緩衝區中的數據
int write(ByteBuffer src) // 將數據寫到緩衝區
// 綁定事件,告訴Selector當前ServerSocketChannel通道或SocketChannel通道關注的事件
SelectionKey register(Selector sel, int ops, Object att) // SocketChannel註冊到Selector,返回事件連接,att參數設置共享數據
boolean configureBlocking(boolean blok); // false表示採用非阻塞
abstract <T> SocketChannel setOption(SocketOption<T> name, T value) // 設置事件
abstract boolean connect(SocketAddress remote); // 連接伺服器
abstract boolean finishConnect() // 如果上面方法連接失敗,該方法完成連接操作
abstract int read(ByteBuffer dst) // 從通道里讀數據到緩衝區
abstract int write(ByteBuffer dst) // 將緩衝區的數據寫到通道里
void close() // 關閉通道
SelectionKey
連接事件,告訴Selector當前ServerSocketChannel通道或SocketChannel通道關注的事件
public abstract class SelectionKey
Selector selector(); // 反向獲取該事件連接對應的selector選擇器
SelectableChannel channel(); // 反向獲取該事件連接對應的channel通道,可以通過得到的channel完成業務處理
Object attachment(); // 得到與之關聯的共享數據
SelectionKey interestOps(int ops); // 設置或改變監聽事件
int OP_READ = 1 << 0; // 1 一般用於客戶端通道SocketChannel,用於設置通道為可讀事件
int OP_WRITE = 1 << 2; // 4 一般用於服務端通道ServerSocketChannel, 用於設置通道為可寫事件
int OP_CONNECT = 1 << 3; // 8 一般用於客戶端通道SocketChannel, 用於設置通道為可請求連接事件
int OP_ACCEPT = 1 << 4; // 16 一般用於服務端通道ServerSocketChannel,設置為可接收連接事件
boolean isReadable() //
boolean isWritable() //
boolean isConnectable() //
boolean isAcceptable() // 該SelectionKey是OP_ACCEPT事件,表示該Selector選擇器包含可接收連接事件
NIO模型
Buffer
Buffer類維護了4個核心變數屬性來提供關於其所包含的數組的資訊。它們是:
-
容量Capacity
-
- 緩衝區能夠容納的數據元素的最大數量。容量在緩衝區創建時被設定,並且永遠不能被改變。(底層是數組)
-
上界Limit
-
- 緩衝區里的數據的總數,代表了當前緩衝區中一共有多少數據。
-
位置Position
-
- 下一個要被讀或寫的元素的位置。Position會自動由相應的
get( )
和put( )
函數更新。
- 下一個要被讀或寫的元素的位置。Position會自動由相應的
-
標記Mark
-
- 一個備忘位置。用於記錄上一次讀寫的位置。
get() 和 put() 方法分別用於從緩衝區讀取數據和寫數據到緩衝區,
flip()
方法可以改動position和limit的位置,從快取區拿數據。當調用完filp()
:limit是限制讀到哪裡,而position是從哪裡讀,filp()
為「切換成讀模式」
讀完我們還想寫數據到緩衝區,那就使用clear()
函數,這個函數會「清空」緩衝區。數據沒有真正被清空,只是被遺忘掉了
直接與非直接緩衝區
- 非直接緩衝區是需要經過一個:copy的階段的(從內核空間copy到用戶空間)
- 直接緩衝區不需要經過copy階段,也可以理解成—>記憶體映射文件,(上面的圖片也有過例子)。
使用直接緩衝區有兩種方式:
- 緩衝區創建的時候分配的是直接緩衝區
- 在FileChannel上調用
map()
方法,將文件直接映射到記憶體中創建
Channel
使用FileChannel配合緩衝區實現文件複製的功能
使用記憶體映射文件的方式實現文件複製的功能(直接操作緩衝區)
MappedByteBuffer inMappedBuf = inChannel.map(MAPMODE.READ_ONLY, 0, inChannel.size());
通道之間通過transfer()
實現數據的傳輸(直接操作緩衝區)
(FileChannel)inChannel.transferTo(0, inChannel.size, (FileChannel) outChannel)
scatter、gather
分散讀取(scatter):將一個通道中的數據分散讀取到多個緩衝區中
聚集寫入(gather):將多個緩衝區中的數據集中寫入到一個通道中
NIO模型
文件描述符fd:Linux 的內核將所有外部設備都看做一個文件來操作,對一個文件的讀寫操作會調用內核提供的系統命令(api),返回一個file descriptor
(fd,文件描述符)。而對一個socket的讀寫也會有響應的描述符,稱為socket fd
(socket文件描述符),描述符就是一個數字,指向內核中的一個結構體(文件路徑,數據區等一些屬性)。在Linux下對文件的操作是利用文件描述符fd來實現的。
阻塞I/O模型:在進程(用戶)空間中調用recvfrom
,其系統調用直到數據包到達且被複制到應用進程的緩衝區中或者發生錯誤時才返回,在此期間一直等待。
阻塞I/O模型:recvfrom
從應用層到內核的時候,如果沒有數據就直接返回一個EWOULDBLOCK錯誤,一般都對非阻塞I/O模型進行輪詢檢查這個狀態,看內核是不是有數據到來。
I/O復用模型:
通常使用NIO是在網路IO中使用的,NIO都是在網路通訊的基礎之上,NIO是非阻塞的NIO也是網路中體現的,網路中使用NIO往往是I/O模型的多路復用模型!
Selector選擇器就好比提醒取餐的廣播。一個執行緒能夠管理多個Channel的狀態
調用select/poll/epoll/pselect
其中一個函數,傳入多個文件描述符,如果有一個文件描述符就緒,則返回,否則阻塞直到超時。
// poll
int poll(struct pollfd *fds,nfds_t nfds, int timeout);
struct pollfd {
int fd; /* 文件描述符 */
short events; /* 等待的事件 */
short revents; /* 實際發生了的事件 */
};
- (1)當用戶進程調用了select,那麼整個進程會被block;
- (2)而同時,kernel會「監視」所有select負責的socket;
- (3)當任何一個socket中的批量數據準備好了,select就會返回;
- (4)這個時候用戶進程再調用read操作,將數據從kernel批量拷貝到用戶進程(空間)。
所以,I/O 多路復用的特點是通過一種機制一個進程能同時等待多個文件描述符,而這些文件描述符其中的任意一個進入讀就緒狀態,select()函數就可以返回。
select/epoll的優勢並不是對於單個連接能處理得更快,而是在於能處理更多的連接。
非阻塞NIO用法
服務端
public class NoBlockServer {
public static void main(String[] args) throws IOException {
// 1.獲取通道
ServerSocketChannel server = ServerSocketChannel.open();
// 2.切換成非阻塞模式
server.configureBlocking(false);
// 3. 綁定連接
server.bind(new InetSocketAddress(6666));
// 4. 獲取選擇器
Selector selector = Selector.open();
// 4.1將通道註冊到選擇器上,指定接收「監聽通道」事件
server.register(selector, SelectionKey.OP_ACCEPT);
// 5. 輪訓地獲取選擇器上已「就緒」的事件--->只要select()>0,說明已就緒
while (selector.select() > 0) {
// 6. 獲取當前選擇器所有註冊的「選擇鍵」(已就緒的監聽事件)
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 7. 獲取已「就緒」的事件,(不同的事件做不同的事)
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 接收事件就緒
if (selectionKey.isAcceptable()) {
// 8. 獲取客戶端的鏈接
SocketChannel client = server.accept();
// 8.1 切換成非阻塞狀態
client.configureBlocking(false);
// 8.2 註冊到選擇器上-->拿到客戶端的連接為了讀取通道的數據(監聽讀就緒事件)
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) { // 讀事件就緒
// 9. 獲取當前選擇器讀就緒狀態的通道
SocketChannel client = (SocketChannel) selectionKey.channel();
// 9.1讀取數據
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 9.2得到文件通道,將客戶端傳遞過來的圖片寫到本地項目下(寫模式、沒有則創建)
FileChannel outChannel = FileChannel.open(Paths.get("2.png"), StandardOpenOption.WRITE, StandardOpenOption.CREATE);
while (client.read(buffer) > 0) {
// 在讀之前都要切換成讀模式
buffer.flip();
outChannel.write(buffer);
// 讀完切換成寫模式,能讓管道繼續讀取文件的數據
buffer.clear();
}
}
// 10. 取消選擇鍵(已經處理過的事件,就應該取消掉了)
iterator.remove();
}
}
}
}
客戶端
public class NoBlockClient2 {
public static void main(String[] args) throws IOException {
// 1. 獲取通道
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 6666));
// 1.1切換成非阻塞模式
socketChannel.configureBlocking(false);
// 1.2獲取選擇器
Selector selector = Selector.open();
// 1.3將通道註冊到選擇器中,獲取服務端返回的數據.在客戶端上要想獲取得到服務端的數據,也需要註冊在register上(監聽讀事件)!
socketChannel.register(selector, SelectionKey.OP_READ);
// 2. 發送一張圖片給服務端吧
FileChannel fileChannel = FileChannel.open(Paths.get("X:\\Users\\ozc\\Desktop\\新建文件夾\\1.png"), StandardOpenOption.READ);
// 3.要使用NIO,有了Channel,就必然要有Buffer,Buffer是與數據打交道的呢
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 4.讀取本地文件(圖片),發送到伺服器
while (fileChannel.read(buffer) != -1) {
// 在讀之前都要切換成讀模式
buffer.flip();
socketChannel.write(buffer);
// 讀完切換成寫模式,能讓管道繼續讀取文件的數據
buffer.clear();
}
// 5. 輪訓地獲取選擇器上已「就緒」的事件--->只要select()>0,說明已就緒
while (selector.select() > 0) {
// 6. 獲取當前選擇器所有註冊的「選擇鍵」(已就緒的監聽事件)
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 7. 獲取已「就緒」的事件,(不同的事件做不同的事)
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
// 8. 讀事件就緒
if (selectionKey.isReadable()) {
// 8.1得到對應的通道
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer responseBuffer = ByteBuffer.allocate(1024);
// 9. 知道服務端要返迴響應的數據給客戶端,客戶端在這裡接收
int readBytes = channel.read(responseBuffer);
if (readBytes > 0) {
// 切換讀模式
responseBuffer.flip();
System.out.println(new String(responseBuffer.array(), 0, readBytes));
}
}
// 10. 取消選擇鍵(已經處理過的事件,就應該取消掉了)
iterator.remove();
}
}
}
}
- 將Socket通道註冊到Selector中,監聽感興趣的事件
- 當感興趣的時間就緒時,則會進去我們處理的方法進行處理
- 每處理完一次就緒事件,刪除該選擇鍵(迭代器)(因為我們已經處理完了)
NIO實現UDP
pipe管道
NIO的管道Pipe是2個執行緒之間的單項數據連接,pipe有source通道和sink通道,數據會被寫到sink通過,客戶端從source通道接收數據
IO主要問題
- 執行緒資源受限:執行緒是作業系統中非常寶貴的資源,同一時刻有大量的執行緒處於阻塞狀態是非常嚴重的資源浪費,作業系統耗不起
- 執行緒切換效率低下:單機cpu核數固定,執行緒爆炸之後作業系統頻繁進行執行緒切換,應用性能急劇下降。
- 除了以上兩個問題,IO編程中,我們看到數據讀寫是以位元組流為單位,效率不高。
為了解決這三個問題,JDK在1.4之後提出了NIO。
執行緒資源受限
NIO編程模型中,新來一個連接不再創建一個新的執行緒,而是可以把這條連接直接綁定到某個固定的執行緒,然後這條連接所有的讀寫都由這個執行緒來負責
如上圖所示,IO模型中,一個連接來了,會創建一個執行緒,對應一個while死循環,死循環的目的就是不斷監測這條連接上是否有數據可以讀,大多數情況下,1w個連接裡面同一時刻只有少量的連接有數據可讀,因此,很多個while死循環都白白浪費掉了,因為讀不出啥數據。
而在NIO模型中,他把這麼多while死循環變成一個死循環,這個死循環由一個執行緒控制,那麼他又是如何做到一個執行緒,一個while死循環就能監測1w個連接是否有數據可讀的呢?
這就是NIO模型中selector的作用,一條連接來了之後,現在不創建一個while死循環去監聽是否有數據可讀了,而是直接把這條連接註冊到selector上,然後,通過檢查這個selector,就可以批量監測出有數據可讀的連接,進而讀取數據
實際開發過程中,我們會開多個執行緒,每個執行緒都管理著一批連接,相對於IO模型中一個執行緒管理一條連接,消耗的執行緒資源大幅減少
執行緒切換效率低下
由於NIO模型中執行緒數量大大降低,執行緒切換效率因此也大幅度提高
IO讀寫以位元組為單位
NIO解決這個問題的方式是數據讀寫不再以位元組為單位,而是以位元組塊為單位。IO模型中,每次都是從作業系統底層一個位元組一個位元組地讀取數據,而NIO維護一個緩衝區,每次可以從這個緩衝區裡面讀取一塊的數據
原生JDK的NIO寫法
- NIO模型中通常會有兩個執行緒,每個執行緒綁定一個輪詢器selector,在我們這個例子中
serverSelector
負責輪詢是否有新的連接,clientSelector
負責輪詢連接是否有數據可讀 - 服務端監測到新的連接之後,不再創建一個新的執行緒,而是直接將新連接綁定到
clientSelector
上,這樣就不用IO模型中1w個while循環在死等,參見(1) clientSelector
被一個while死循環包裹著,如果在某一時刻有多條連接有數據可讀,那麼通過clientSelector.select(1)
方法可以輪詢出來,進而批量處理,參見(2)- 數據的讀寫以記憶體塊為單位,參見(3)
/**
* @author 閃電俠
*/
public class NIOServer {
public static void main(String[] args) throws IOException {
Selector serverSelector = Selector.open();
Selector clientSelector = Selector.open();
new Thread(() -> {
try {
// 對應IO編程中服務端啟動
ServerSocketChannel listenerChannel = ServerSocketChannel.open();
listenerChannel.socket().bind(new InetSocketAddress(8000));
listenerChannel.configureBlocking(false); // 監聽器設置為非阻塞
listenerChannel.register(serverSelector, SelectionKey.OP_ACCEPT);
while (true) {
// 監測是否有新的連接,這裡的1指的是阻塞的時間為1ms
if (serverSelector.select(1) > 0) {
Set<SelectionKey> set = serverSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isAcceptable()) { // 接收到新連接
try {
// (1) 每來一個新連接,不需要創建一個執行緒,而是直接註冊到clientSelector
SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
clientChannel.configureBlocking(false);
// clientChannel 註冊到 clientSelector
clientChannel.register(clientSelector, SelectionKey.OP_READ);
} finally {
keyIterator.remove(); // 釋放連接迭代器
}
}
}
}
}
} catch (IOException ignored) {
}
}).start();
new Thread(() -> {
try {
while (true) {
// (2) 批量輪詢是否有哪些連接有數據可讀,這裡的1指的是阻塞的時間為1ms
if (clientSelector.select(1) > 0) {
// 獲取set結構的連接集合
Set<SelectionKey> set = clientSelector.selectedKeys();
Iterator<SelectionKey> keyIterator = set.iterator();
// 遍歷所有連接,查詢有可讀數據的連接
while (keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if (key.isReadable()) { // 該連接有可讀數據
try {
SocketChannel clientChannel = (SocketChannel) key.channel();
// 給快取塊分配記憶體地址,用於存放批量連接
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// (3) 讀取數據,以塊為單位從clientChannel管道中批量讀取數據當到Bytebuffer
clientChannel.read(byteBuffer);
byteBuffer.flip(); // 轉換為讀模式
System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
.toString());
} finally {
keyIterator.remove();
key.interestOps(SelectionKey.OP_READ);
}
}
}
}
}
} catch (IOException ignored) {
}
}).start();
}
}
NIO實現群聊系統
server端
public class GroupChatClient {
private final String HOST = "127.0.0.1";
private final int PORT = 6668;
private Selector selector;
private SocketChannel socketChannel;
private String username;
public GroupChatClient() throws IOException {
selector = Selector.open();
socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT));
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
username = socketChannel.getLocalAddress().toString().substring(1);
System.out.println(username + "is ok...");
}
// 客戶端發送資訊
public void sendInfo(String info) {
info = username + "說:" + info;
try {
// 將緩衝區中的數據寫到socketChannel通道中
socketChannel.write(ByteBuffer.wrap(info.getBytes()));
} catch (Exception e) {
e.printStackTrace();
}
}
// 讀取數據
public void readInfo() {
try {
// 可用通道數量,即與selector綁定的通道
int readChannels = selector.select();
if(readChannels > 0) { // 有可以用的通道
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
// 從通道讀取數據到buffer
sc.read(buffer);
String msg = new String(buffer.array());
System.out.println(msg.trim()); // 去掉msg頭尾空格
}
}
iterator.remove(); // 防止重複讀取同一個通道的數據
} else {
// System.out.println("沒有可以用的通道。。");
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
GroupChatClient chatClient = new GroupChatClient();
new Thread() {
@Override
public void run() {
while(true) {
chatClient.readInfo();
try {
Thread.currentThread().sleep(3000);
} catch (Exception e){
e.printStackTrace();
}
}
}
}.start();
// 發送數據給服務端
Scanner scanner = new Scanner(System.in);
while(scanner.hasNextLine()) {
String s = scanner.nextLine();
chatClient.sendInfo(s);
}
}
}
client端
public class GroupChatServer {
private static Selector selector;
private static ServerSocketChannel serverSocketChannel;
private static final int PORT = 6668;
// 構造器初始化
public GroupChatServer() {
try {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(PORT));
serverSocketChannel.configureBlocking(false);
// 設置該服務端socket通道為可接收事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
// 伺服器啟動並監聽6667埠
public void listen() {
try {
while(true) {
// 監聽連接通道,返回有事件觸發的通道的個數
// 阻塞操作,直到至少獲取到一個帶有事件的連接
int count = selector.select(2000);
// 有可讀數據的連接請求
if(count > 0) {
// 獲取帶事件的連接的迭代器
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
// 遍歷所有帶有可接受時間的連接
while(iterator.hasNext()) {
// 接收包含可讀數據的連接
SelectionKey selectionKey = iterator.next();
// 監聽到有可連接事件的通道
if(selectionKey.isAcceptable()) { // 上面構造函數時已經設置了OP_ACCEPT
// 根據連接通道獲取到傳輸數據的通道,相當於BIO的socket
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
// 註冊時間
// 將傳輸數據的通道SocketChannel註冊到selecgtor選擇器中
// 通道註冊,使得連接通道與傳輸數據通道關聯,即ServerSocketChannel和SocketChannel關聯
// 選擇OP_READ事件,SocketChannel相當於轉換為讀模式
socketChannel.register(selector, SelectionKey.OP_READ); // 告訴selector綁定的讀事件
System.out.println(socketChannel.getRemoteAddress() + "上線了");
}
// 該通道觸發可讀事件
if(selectionKey.isReadable()) {
readData(selectionKey);
}
}
iterator.remove();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
}
}
// 負責讀取客戶端消息,並轉發到其他客戶端
private void readData(SelectionKey key) {
SocketChannel channel = null;
try {
channel = (SocketChannel) key.channel();
// 創建緩衝
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
// 讀取該socketChannel通道里的數據到緩衝區里
int count = channel.read(byteBuffer);
if(count > 0) {
String msg = new String(byteBuffer.array());
System.out.println("from 客戶端:" + msg);
// 向其他客戶端轉發消息
sendInfoToOtherClients(msg, channel);
}
} catch (Exception e) {
// 可能在讀取該客戶端數據時,該客戶端關閉或宕機或網路抖動或離線
try {
System.out.println(channel.getRemoteAddress() + "離線了。。。");
// 取消該客戶端的註冊
key.cancel();
channel.close(); // 該客戶端的SocketChannel關閉
} catch (IOException e1) {
e1.printStackTrace();
}
} finally {
}
}
// 轉發消息給其他客戶
private void sendInfoToOtherClients(String msg, SocketChannel selfChannel) throws IOException {
System.out.println("伺服器轉發消息中");
for(SelectionKey key : selector.keys()) {
Channel targetChannel = key.channel();
// 排除自己,不需要轉發自己的消息給自己
if(targetChannel instanceof SocketChannel && targetChannel != selfChannel) {
SocketChannel dest = (SocketChannel) targetChannel;
ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
dest.write(buffer); // 將緩衝區數據寫入到dest通道
}
}
}
public static void main(String[] args) {
GroupChatServer groupChatServer = new GroupChatServer();
groupChatServer.listen();
}
}
socket相關Linux命令
執行並追蹤執行中的Java執行緒:stace -ff -o out java [要追蹤的 java 文件]
查看當前進程狀態以及pid:netstat -natp
列印某個文件尾10行: tail -f xxx
實現tcp三次握手連接:**nc localhost 8090 **
vim顯示行數::set nu
vim跳到指定第5行:ngg/5G 或 :n
查看有關係統調用的指令:man 2 [要查看的系統調用命令, 如socket]
NIO零拷貝
1、傳統IO需要進行4次拷貝,2次DMA(直接記憶體拷貝,不使用cpu)和2次cpu拷貝(消耗性能多),3次上下文切換,性能較差
2、而NIO出現了mmap,對傳統IO進行優化。使得IO只需要2次DMA拷貝,1次CPU拷貝,三次上下文切換
mmap:通過記憶體映射,將文件映射到內核緩衝區,同時用戶空間可以共享內核空間的數據。進行網路傳輸時,就可以減少內核空間到用戶空間的拷貝次數
3、進一步優化:sendFile函數
Linux2.1提供的sendFile函數,其原理:數據不經過用戶態,而是直接從內核緩衝區進入到Socket Buffer,減少了一次上下文切換
4、Linux2.4優化,即實現了零拷貝(不進行CPU拷貝就是零拷貝):
避免了從內核緩衝區拷貝到Socketbuffer,而是直接拷貝到協議棧protocal engine,從而減少了數據拷貝(CPU拷貝),因此為:2次DMA copy,2次上下文切換。
(注意:零拷貝中間還是有一次cpu拷貝的,即copy desc,但是拷貝的資訊很少,如kernel buffer的length、offset等描述資訊,消耗低,可忽略)
零拷貝是從作業系統角度來說的,即內核緩衝區kernel buffer之間沒有重複的數據
零拷貝的使用
FileChannel fileChannel = FileInputStream("fileName").getChannel;
long transferCount = fileChannel.transferTO(0, fileChannel.size(), socketChannel);
NIO的BUG
epoll空輪詢bug
epoll空輪詢bug體現在Selector空輪詢, 若Selector的輪詢結果為空,也沒有wakeup或新消息處理,則發生空輪詢,CPU使用率100%
原因:
- 正常情況下,
selector.select()
操作是阻塞的,只有被監聽的fd有讀寫操作時,才被喚醒 - 但是,在這個bug中,沒有任何fd有讀寫請求,但是
select()
操作依舊被喚醒 - 很顯然,這種情況下,
selectedKeys()
返回的是個空數組 - 然後按照邏輯執行到
while(true)
處,循環執行,導致死循環。
Netty的解決辦法
-
- 根據該BUG的特徵,首先偵測該BUG是否發生
偵測方法:對Selector的select操作周期進行統計,每完成一次空的select操作進行一次計數;
若在某個周期內連續發生N次空輪詢,則觸發了epoll死循環bug, netty默認是512次
-
- 將問題Selector上註冊的Channel轉移到新建的Selector上;老的問題Selector關閉,使用新建的Selector替換。
在netty中使用 rebuildSelector() 方法重建Selector,判斷是否是其他執行緒發起的重建請求,若不是則將原SocketChannel從舊的Selector上去除註冊,重新註冊到新的Selector上,並將原來的Selector關閉。
public void rebuildSelector() {
if (!inEventLoop()) {
execute(new Runnable() {
@Override
public void run() {
rebuildSelector0();
}
});
return;
}
rebuildSelector0();
}
private void rebuildSelector0() {
final Selector oldSelector = selector;
final SelectorTuple newSelectorTuple;
if (oldSelector == null) {
return;
}
try {
newSelectorTuple = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (SelectionKey key: oldSelector.keys()) {
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
selector = newSelectorTuple.selector;
unwrappedSelector = newSelectorTuple.unwrappedSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
if (logger.isInfoEnabled()) {
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
}
netty 會在每次進行 selector.select(timeoutMillis) 之前記錄一下開始時間currentTimeNanos,在select之後記錄一下結束時間,判斷select操作是否至少持續了timeoutMillis秒(這裡將time – TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos改成time – currentTimeNanos >= TimeUnit.MILLISECONDS.toNanos(timeoutMillis)或許更好理解一些),
如果持續的時間大於等於timeoutMillis,說明就是一次有效的輪詢,重置selectCnt標誌,否則,表明該阻塞方法並沒有阻塞這麼長時間,可能觸發了jdk的空輪詢bug,當空輪詢的次數超過一個閥值的時候,默認是512,就開始重建selector