Java網路編程和NIO詳解7:淺談 Linux 中NIO Selector 的實現原理
- 2019 年 11 月 21 日
- 筆記
本文轉自互聯網
本系列文章將整理到我在GitHub上的《Java面試指南》倉庫,更多精彩內容請到我的倉庫里查看
https://github.com/h2pl/Java-Tutorial
喜歡的話麻煩點下Star哈
文章將同步到我的個人部落格:
www.how2playlife.com
本文是微信公眾號【Java技術江湖】的《不可輕視的Java網路編程》其中一篇,本文部分內容來源於網路,為了把本文主題講得清晰透徹,也整合了很多我認為不錯的技術部落格內容,引用其中了一些比較好的部落格文章,如有侵權,請聯繫作者。
該系列博文會告訴你如何從電腦網路的基礎知識入手,一步步地學習Java網路基礎,從socket到nio、bio、aio和netty等網路編程知識,並且進行實戰,網路編程是每一個Java後端工程師必須要學習和理解的知識點,進一步來說,你還需要掌握Linux中的網路編程原理,包括IO模型、網路編程框架netty的進階原理,才能更完整地了解整個Java網路編程的知識體系,形成自己的知識框架。
為了更好地總結和檢驗你的學習成果,本系列文章也會提供部分知識點對應的面試題以及參考答案。
如果對本系列文章有什麼建議,或者是有什麼疑問的話,也可以關注公眾號【Java技術江湖】聯繫作者,歡迎你參與本系列博文的創作和修訂。
概述
Selector是NIO中實現I/O多路復用的關鍵類。Selector實現了通過一個執行緒管理多個Channel,從而管理多個網路連接的目的。
Channel代表這一個網路連接通道,我們可以將Channel註冊到Selector中以實現Selector對其的管理。一個Channel可以註冊到多個不同的Selector中。
當Channel註冊到Selector後會返回一個SelectionKey對象,該SelectionKey對象則代表這這個Channel和它註冊的Selector間的關係。並且SelectionKey中維護著兩個很重要的屬性:interestOps、readyOps interestOps是我們希望Selector監聽Channel的哪些事件。
我們將我們感興趣的事件設置到該欄位,這樣在selection操作時,當發現該Channel有我們所感興趣的事件發生時,就會將我們感興趣的事件再設置到readyOps中,這樣我們就能得知是哪些事件發生了以做相應處理。
Selector的中的重要屬性
Selector中維護3個特別重要的SelectionKey集合,分別是
- keys:所有註冊到Selector的Channel所表示的SelectionKey都會存在於該集合中。keys元素的添加會在Channel註冊到Selector時發生。
- selectedKeys:該集合中的每個SelectionKey都是其對應的Channel在上一次操作selection期間被檢查到至少有一種SelectionKey中所感興趣的操作已經準備好被處理。該集合是keys的一個子集。
- cancelledKeys:執行了取消操作的SelectionKey會被放入到該集合中。該集合是keys的一個子集。
下面的源碼解析會說明上面3個集合的用處
Selector 源碼解析
下面我們通過一段對Selector的使用流程講解來進一步深入其實現原理。首先先來段Selector最簡單的使用片段
ServerSocketChannel serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); int port = 5566; serverChannel.socket().bind(new InetSocketAddress(port)); Selector selector = Selector.open(); serverChannel.register(selector, SelectionKey.OP_ACCEPT); while(true){ int n = selector.select(); if(n > 0) { Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey selectionKey = iter.next(); ...... iter.remove(); } } }
1、Selector的構建
SocketChannel、ServerSocketChannel和Selector的實例初始化都通過SelectorProvider類實現。
ServerSocketChannel.open();
public static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel(); }
SocketChannel.open();
public static SocketChannel open() throws IOException { return SelectorProvider.provider().openSocketChannel(); }
Selector.open();
public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); }
我們來進一步的了解下SelectorProvider.provider()
public static SelectorProvider provider() { synchronized (lock) { if (provider != null) return provider; return AccessController.doPrivileged( new PrivilegedAction<>() { public SelectorProvider run() { if (loadProviderFromProperty()) return provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }
① 如果配置了「java.nio.channels.spi.SelectorProvider」屬性,則通過該屬性值load對應的SelectorProvider對象,如果構建失敗則拋異常。② 如果provider類已經安裝在了對系統類載入程式可見的jar包中,並且該jar包的源碼目錄META-INF/services包含有一個java.nio.channels.spi.SelectorProvider提供類配置文件,則取文件中第一個類名進行load以構建對應的SelectorProvider對象,如果構建失敗則拋異常。③ 如果上面兩種情況都不存在,則返回系統默認的SelectorProvider,即,sun.nio.ch.DefaultSelectorProvider.create(); ④ 隨後在調用該方法,即SelectorProvider.provider()。則返回第一次調用的結果。
不同系統對應著不同的sun.nio.ch.DefaultSelectorProvider

這裡我們看linux下面的sun.nio.ch.DefaultSelectorProvider
public class DefaultSelectorProvider { /** * Prevent instantiation. */ private DefaultSelectorProvider() { } /** * Returns the default SelectorProvider. */ public static SelectorProvider create() { return new sun.nio.ch.EPollSelectorProvider(); } }
可以看見,linux系統下sun.nio.ch.DefaultSelectorProvider.create(); 會生成一個sun.nio.ch.EPollSelectorProvider類型的SelectorProvider,這裡對應於linux系統的epoll
接下來看下 selector.open():
/** * Opens a selector. * * <p> The new selector is created by invoking the {@link * java.nio.channels.spi.SelectorProvider#openSelector openSelector} method * of the system-wide default {@link * java.nio.channels.spi.SelectorProvider} object. </p> * * @return A new selector * * @throws IOException * If an I/O error occurs */ public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); }
在得到sun.nio.ch.EPollSelectorProvider後調用openSelector()方法構建Selector,這裡會構建一個EPollSelectorImpl對象。
EPollSelectorImpl
class EPollSelectorImpl extends SelectorImpl{ // File descriptors used for interrupt protected int fd0; protected int fd1; // The poll object EPollArrayWrapper pollWrapper; // Maps from file descriptors to keys private Map<Integer,SelectionKeyImpl> fdToKey;
EPollSelectorImpl(SelectorProvider sp) throws IOException { super(sp); long pipeFds = IOUtil.makePipe(false); fd0 = (int) (pipeFds >>> 32); fd1 = (int) pipeFds; try { pollWrapper = new EPollArrayWrapper(); pollWrapper.initInterrupt(fd0, fd1); fdToKey = new HashMap<>(); } catch (Throwable t) { try { FileDispatcherImpl.closeIntFD(fd0); } catch (IOException ioe0) { t.addSuppressed(ioe0); } try { FileDispatcherImpl.closeIntFD(fd1); } catch (IOException ioe1) { t.addSuppressed(ioe1); } throw t; } }
EPollSelectorImpl構造函數完成:① EPollArrayWrapper的構建,EpollArrayWapper將Linux的epoll相關係統調用封裝成了native方法供EpollSelectorImpl使用。② 通過EPollArrayWrapper向epoll註冊中斷事件
void initInterrupt(int fd0, int fd1) { outgoingInterruptFD = fd1; incomingInterruptFD = fd0; epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN); }
③ fdToKey:構建文件描述符-SelectionKeyImpl映射表,所有註冊到selector的channel對應的SelectionKey和與之對應的文件描述符都會放入到該映射表中。
EPollArrayWrapper
EPollArrayWrapper完成了對epoll文件描述符的構建,以及對linux系統的epoll指令操縱的封裝。維護每次selection操作的結果,即epollwait結果的epollevent數組。EPollArrayWrapper操縱了一個linux系統下epoll_event結構的本地數組。
* typedef union epoll_data {* void *ptr;* int fd;* __uint32_t u32;* __uint64_t u64;* } epoll_data_t;** struct epoll_event {* __uint32_t events;* epoll_data_t data;* };
epollevent的數據成員(epolldatat data)包含有與通過epollctl將文件描述符註冊到epoll時設置的數據相同的數據。這裡data.fd為我們註冊的文件描述符。這樣我們在處理事件的時候持有有效的文件描述符了。
EPollArrayWrapper將Linux的epoll相關係統調用封裝成了native方法供EpollSelectorImpl使用。
private native int epollCreate(); private native void epollCtl(int epfd, int opcode, int fd, int events); private native int epollWait(long pollAddress, int numfds, long timeout, int epfd) throws IOException;
上述三個native方法就對應Linux下epoll相關的三個系統調用
// The fd of the epoll driver private final int epfd; // The epoll_event array for results from epoll_wait private final AllocatedNativeObject pollArray; // Base address of the epoll_event array private final long pollArrayAddress;
// 用於存儲已經註冊的文件描述符和其註冊等待改變的事件的關聯關係。在epoll_wait操作就是要檢測這裡文件描述法註冊的事件是否有發生。 private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE]; private final Map<Integer,Byte> eventsHigh = new HashMap<>();
EPollArrayWrapper() throws IOException { // creates the epoll file descriptor epfd = epollCreate(); // the epoll_event array passed to epoll_wait int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT; pollArray = new AllocatedNativeObject(allocationSize, true); pollArrayAddress = pollArray.address(); }
EPoolArrayWrapper構造函數,創建了epoll文件描述符。構建了一個用於存放epollwait返回結果的epollevent數組。
ServerSocketChannel的構建
ServerSocketChannel.open();
返回ServerSocketChannelImpl對象,構建linux系統下ServerSocket的文件描述符。
// Our file descriptor private final FileDescriptor fd; // fd value needed for dev/poll. This value will remain valid // even after the value in the file descriptor object has been set to -1 private int fdVal;
ServerSocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); this.fd = Net.serverSocket(true); this.fdVal = IOUtil.fdVal(fd); this.state = ST_INUSE; }
將ServerSocketChannel註冊到Selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
public final SelectionKey register(Selector sel, int ops, Object att) throws ClosedChannelException { synchronized (regLock) { if (!isOpen()) throw new ClosedChannelException(); if ((ops & ~validOps()) != 0) throw new IllegalArgumentException(); if (blocking) throw new IllegalBlockingModeException(); SelectionKey k = findKey(sel); if (k != null) { k.interestOps(ops); k.attach(att); } if (k == null) { // New registration synchronized (keyLock) { if (!isOpen()) throw new ClosedChannelException(); k = ((AbstractSelector)sel).register(this, ops, att); addKey(k); } } return k; } }
protected final SelectionKey register(AbstractSelectableChannel ch, int ops, Object attachment) { if (!(ch instanceof SelChImpl)) throw new IllegalSelectorException(); SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this); k.attach(attachment); synchronized (publicKeys) { implRegister(k); } k.interestOps(ops); return k; }
① 構建代表channel和selector間關係的SelectionKey對象 ② implRegister(k)將channel註冊到epoll中 ③ k.interestOps(int) 完成下面兩個操作:a) 會將註冊的感興趣的事件和其對應的文件描述存儲到EPollArrayWrapper對象的eventsLow或eventsHigh中,這是給底層實現epoll_wait時使用的。b) 同時該操作還會將設置SelectionKey的interestOps欄位,這是給我們程式設計師獲取使用的。
EPollSelectorImpl. implRegister
protected void implRegister(SelectionKeyImpl ski) { if (closed) throw new ClosedSelectorException(); SelChImpl ch = ski.channel; int fd = Integer.valueOf(ch.getFDVal()); fdToKey.put(fd, ski); pollWrapper.add(fd); keys.add(ski); }
① 將channel對應的fd(文件描述符)和對應的SelectionKeyImpl放到fdToKey映射表中。② 將channel對應的fd(文件描述符)添加到EPollArrayWrapper中,並強制初始化fd的事件為0 ( 強制初始更新事件為0,因為該事件可能存在於之前被取消過的註冊中。) ③ 將selectionKey放入到keys集合中。
Selection操作
selection操作有3中類型:① select():該方法會一直阻塞直到至少一個channel被選擇(即,該channel註冊的事件發生了)為止,除非當前執行緒發生中斷或者selector的wakeup方法被調用。② select(long time):該方法和select()類似,該方法也會導致阻塞直到至少一個channel被選擇(即,該channel註冊的事件發生了)為止,除非下面3種情況任意一種發生:a) 設置的超時時間到達;b) 當前執行緒發生中斷;c) selector的wakeup方法被調用 ③ selectNow():該方法不會發生阻塞,如果沒有一個channel被選擇也會立即返回。
我們主要來看看select()的實現 :int n = selector.select();
public int select() throws IOException { return select(0); }
最終會調用到EPollSelectorImpl的doSelect
protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); processDeregisterQueue(); try { begin(); pollWrapper.poll(timeout); } finally { end(); } processDeregisterQueue(); int numKeysUpdated = updateSelectedKeys(); if (pollWrapper.interrupted()) { // Clear the wakeup pipe pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; }
① 先處理註銷的selectionKey隊列 ② 進行底層的epoll_wait操作 ③ 再次對註銷的selectionKey隊列進行處理 ④ 更新被選擇的selectionKey
先來看processDeregisterQueue():
void processDeregisterQueue() throws IOException { Set var1 = this.cancelledKeys(); synchronized(var1) { if (!var1.isEmpty()) { Iterator var3 = var1.iterator(); while(var3.hasNext()) { SelectionKeyImpl var4 = (SelectionKeyImpl)var3.next(); try { this.implDereg(var4); } catch (SocketException var12) { IOException var6 = new IOException("Error deregistering key"); var6.initCause(var12); throw var6; } finally { var3.remove(); } } } } }
從cancelledKeys集合中依次取出註銷的SelectionKey,執行註銷操作,將處理後的SelectionKey從cancelledKeys集合中移除。執行processDeregisterQueue()後cancelledKeys集合會為空。
protected void implDereg(SelectionKeyImpl ski) throws IOException { assert (ski.getIndex() >= 0); SelChImpl ch = ski.channel; int fd = ch.getFDVal(); fdToKey.remove(Integer.valueOf(fd)); pollWrapper.remove(fd); ski.setIndex(-1); keys.remove(ski); selectedKeys.remove(ski); deregister((AbstractSelectionKey)ski); SelectableChannel selch = ski.channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill(); }
註銷會完成下面的操作:① 將已經註銷的selectionKey從fdToKey( 文件描述與SelectionKeyImpl的映射表 )中移除 ② 將selectionKey所代表的channel的文件描述符從EPollArrayWrapper中移除 ③ 將selectionKey從keys集合中移除,這樣下次selector.select()就不會再將該selectionKey註冊到epoll中監聽 ④ 也會將selectionKey從對應的channel中註銷 ⑤ 最後如果對應的channel已經關閉並且沒有註冊其他的selector了,則將該channel關閉 完成?的操作後,註銷的SelectionKey就不會出現先在keys、selectedKeys以及cancelKeys這3個集合中的任何一個。
接著我們來看EPollArrayWrapper.poll(timeout):
int poll(long timeout) throws IOException { updateRegistrations(); updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); for (int i=0; i<updated; i++) { if (getDescriptor(i) == incomingInterruptFD) { interruptedIndex = i; interrupted = true; break; } } return updated; }
updateRegistrations()方法會將已經註冊到該selector的事件(eventsLow或eventsHigh)通過調用epollCtl(epfd, opcode, fd, events); 註冊到linux系統中。這裡epollWait就會調用linux底層的epollwait方法,並返回在epollwait期間有事件觸發的entry的個數
再看updateSelectedKeys():
private int updateSelectedKeys() { int entries = pollWrapper.updated; int numKeysUpdated = 0; for (int i=0; i<entries; i++) { int nextFD = pollWrapper.getDescriptor(i); SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD)); // ski is null in the case of an interrupt if (ski != null) { int rOps = pollWrapper.getEventOps(i); if (selectedKeys.contains(ski)) { if (ski.channel.translateAndSetReadyOps(rOps, ski)) { numKeysUpdated++; } } else { ski.channel.translateAndSetReadyOps(rOps, ski); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { selectedKeys.add(ski); numKeysUpdated++; } } } } return numKeysUpdated; }
該方法會從通過EPollArrayWrapper pollWrapper 以及 fdToKey( 構建文件描述符-SelectorKeyImpl映射表 )來獲取有事件觸發的SelectionKeyImpl對象,然後將SelectionKeyImpl放到selectedKey集合( 有事件觸發的selectionKey集合,可以通過selector.selectedKeys()方法獲得 )中,即selectedKeys。並重新設置SelectionKeyImpl中相關的readyOps值。
但是,這裡要注意兩點:
① 如果SelectionKeyImpl已經存在於selectedKeys集合中,並且發現觸發的事件已經存在於readyOps中了,則不會使numKeysUpdated++;這樣會使得我們無法得知該事件的變化。
?這點說明了為什麼我們要在每次從selectedKey中獲取到Selectionkey後,將其從selectedKey集合移除,就是為了當有事件觸發使selectionKey能正確到放入selectedKey集合中,並正確的通知給調用者。
再者,如果不將已經處理的SelectionKey從selectedKey集合中移除,那麼下次有新事件到來時,在遍歷selectedKey集合時又會遍歷到這個SelectionKey,這個時候就很可能出錯了。比如,如果沒有在處理完OP_ACCEPT事件後將對應SelectionKey從selectedKey集合移除,那麼下次遍歷selectedKey集合時,處理到到該SelectionKey,相應的ServerSocketChannel.accept()將返回一個空(null)的SocketChannel。
② 如果發現channel所發生I/O事件不是當前SelectionKey所感興趣,則不會將SelectionKeyImpl放入selectedKeys集合中,也不會使numKeysUpdated++
epoll原理
select,poll,epoll都是IO多路復用的機制。I/O多路復用就是通過一種機制,一個進程可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程式進行相應的讀寫操作。但select,poll,epoll本質上都是同步I/O,因為他們都需要在讀寫事件就緒後自己負責進行讀寫,也就是說這個讀寫過程是阻塞的,而非同步I/O則無需自己負責進行讀寫,非同步I/O的實現會負責把數據從內核拷貝到用戶空間。
epoll是Linux下的一種IO多路復用技術,可以非常高效的處理數以百萬計的socket句柄。
在 select/poll中,進程只有在調用一定的方法後,內核才對所有監視的文件描述符進行掃描,而epoll事先通過epollctl()來註冊一 個文件描述符,一旦基於某個文件描述符就緒時,內核會採用類似callback的回調機制,迅速激活這個文件描述符,當進程調用epollwait() 時便得到通知。(此處去掉了遍歷文件描述符,而是通過監聽回調的的機制。這正是epoll的魅力所在。) 如果沒有大量的idle -connection或者dead-connection,epoll的效率並不會比select/poll高很多,但是當遇到大量的idle- connection,就會發現epoll的效率大大高於select/poll。
注意:linux下Selector底層是通過epoll來實現的,當創建好epoll句柄後,它就會佔用一個fd值,在linux下如果查看/proc/進程id/fd/,是能夠看到這個fd的,所以在使用完epoll後,必須調用close()關閉,否則可能導致fd被耗盡。
先看看使用c封裝的3個epoll系統調用:
- int epollcreate(int size)epollcreate建立一個epoll對象。參數size是內核保證能夠正確處理的最大句柄數,多於這個最大數時內核可不保證效果。
- int epollctl(int epfd, int op, int fd, struct epollevent *event)epollctl可以操作epollcreate創建的epoll,如將socket句柄加入到epoll中讓其監控,或把epoll正在監控的某個socket句柄移出epoll。
- int epollwait(int epfd, struct epollevent *events,int maxevents, int timeout)epoll_wait在調用時,在給定的timeout時間內,所監控的句柄中有事件發生時,就返回用戶態的進程。 大概看看epoll內部是怎麼實現的:
- epoll初始化時,會向內核註冊一個文件系統,用於存儲被監控的句柄文件,調用epoll_create時,會在這個文件系統中創建一個file節點。同時epoll會開闢自己的內核高速快取區,以紅黑樹的結構保存句柄,以支援快速的查找、插入、刪除。還會再建立一個list鏈表,用於存儲準備就緒的事件。
- 當執行epoll_ctl時,除了把socket句柄放到epoll文件系統里file對象對應的紅黑樹上之外,還會給內核中斷處理程式註冊一個回調函數,告訴內核,如果這個句柄的中斷到了,就把它放到準備就緒list鏈表裡。所以,當一個socket上有數據到了,內核在把網卡上的數據copy到內核中後,就把socket插入到就緒鏈表裡。
- 當epoll_wait調用時,僅僅觀察就緒鏈表裡有沒有數據,如果有數據就返回,否則就sleep,超時時立刻返回。 epoll的兩種工作模式:
- LT:level-trigger,水平觸發模式,只要某個socket處於readable/writable狀態,無論什麼時候進行epoll_wait都會返回該socket。
- ET:edge-trigger,邊緣觸發模式,只有某個socket從unreadable變為readable或從unwritable變為writable時,epoll_wait才會返回該socket。
socket讀數據

socket寫數據

最後順便說下在Linux系統中JDK NIO使用的是 LT ,而Netty epoll使用的是 ET。
後記
因為本人對電腦系統組成以及C語言等知識比較欠缺,因為文中相關知識點的表示也相當「膚淺」,如有不對不妥的地方望讀者指出。同時我也會繼續加強對該方面知識點的學習~
參考文章
http://www.jianshu.com/p/0d497fe5484ahttp://remcarpediem.com/2017/04/02/Netty源碼-三-I-O模型和Java-NIO底層原理/ 聖思園netty課程