NIO-EPollSelectorIpml源碼分析
- 2020 年 3 月 2 日
- 筆記
目錄
NIO-EPollSelectorIpml源碼分析
目錄
NIO-概覽
NIO-Buffer
NIO-Channel
NIO-Channel接口分析
NIO-SocketChannel源碼分析
NIO-FileChannel源碼分析
NIO-Selector源碼分析
NIO-WindowsSelectorImpl源碼分析
NIO-EPollSelectorIpml源碼分析
前言
本來是想學習Netty的,但是Netty是一個NIO框架,因此在學習netty之前,還是先梳理一下NIO的知識。通過剖析源碼理解NIO的設計原理。
本系列文章針對的是JDK1.8.0.161的源碼。
NIO-Selector源碼分析對Selector
的功能和創建過程進行了分析,本篇對Linux環境下JDK實現的EPollSelectorImpl
源碼進行詳細講解。
本篇文章不會對EPoll算法進行詳細介紹,對epoll算法感興趣或還不了解的同學可以看epoll原理詳解及epoll反應堆模型先進行學習。
在詳細介紹EpollSelectorProvider
之前我們先了解一下EPoll主要的三個步驟:
- 調用
epoll_create
建立一個epoll 對象(在epoll文件系統中給這個句柄分配資源); - 調用
epoll_ctl
向epoll對象中添加或刪除文件句柄及監控事件。 - 調用
epoll_wait
收集發生事件的文件描述符。
初始化EPollSelectorProvider
NIO-Selector源碼分析提到,若沒有進行配置時,默認通過sun.nio.ch.DefaultSelectorProvider.create()
創建SelectorProvider
。Linux下的代碼路徑在jdksrcsolarisclassessunniochDefaultSelectorProvider.java
。在其內部通過實際是創建了一個EPollSelectorProvider
。
創建EPollSelectorImpl
EPollSelectorProvider
是用於創建EPollSelectorImpl
的。
Selector.Open()-> SelectorProvider.provider()-> sun.nio.ch.DefaultSelectorProvider.create()-> EPollSelectorProvider.openSelector()-> new EPollSelectorImpl(this) public class EPollSelectorProvider extends SelectorProviderImpl { public AbstractSelector openSelector() throws IOException { return new EPollSelectorImpl(this); } public Channel inheritedChannel() throws IOException { return InheritedChannel.getChannel(); } }
inheritedChannel()
可以返回系統默認SelectorProvider創建的通道,主要有些操作系統底層需要調用默認的通道。
EPollSelectorImpl結構
在詳細講解EPollSelectorImpl
源碼之前,先了解EPollSelectorImpl
的主要的數據結構和屬性。
名稱 | 作用 |
---|---|
Map<Integer,SelectionKeyImpl> fdToKey | 保存文件描述符句柄和的SelectionKey的映射關係 |
int fd0 | 管道的讀端文件描述符 |
int fd1 | 管道的寫端文件描述符 |
EPollArrayWrapper pollWrapper | 調用底層Epoll算法的包裝類 |
EPollSelectorImpl(SelectorProvider sp) throws IOException { super(sp); long pipeFds = IOUtil.makePipe(false); fd0 = (int) (pipeFds >>> 32); //無符號移位 fd1 = (int) pipeFds; pollWrapper = new EPollArrayWrapper(); pollWrapper.initInterrupt(fd0, fd1); fdToKey = new HashMap<>(); } void initInterrupt(int fd0, int fd1) { outgoingInterruptFD = fd1; incomingInterruptFD = fd0; //將管道的讀取端註冊 epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN); }
pipeFds高32位存放的是通道read端的文件描述符FD,低32位存放的是write端的文件描述符。這裡做移位處理。
通過調用JNI的makePipe
方法創建單向管道。
JNIEXPORT jlong JNICALL Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking) { int fd[2]; if (pipe(fd) < 0) { JNU_ThrowIOExceptionWithLastError(env, "Pipe failed"); return 0; } if (blocking == JNI_FALSE) { //配置阻塞 if ((configureBlocking(fd[0], JNI_FALSE) < 0) || (configureBlocking(fd[1], JNI_FALSE) < 0)) { JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed"); close(fd[0]); close(fd[1]); return 0; } } //高32位存讀端,低32位存寫端 return ((jlong) fd[0] << 32) | (jlong) fd[1]; }
JNI內部則通過pipe
創建管道。
對於管道的詳細邏輯可以看《Linux管道 – 系統調用pipe()函數實現》
fdToKey
在註冊時會將文件描述符的句柄和對應的SelectionKey保存到Map<Integer,SelectionKeyImpl> fdToKey
中
管道文件描述符
在EPollSelectorImpl創建的時候會使用IOUtil.makePipe(false)
調用創建一個管道,用於喚醒線程用。當線程中斷時通過向寫管道寫入一個位元組來喚醒線程,具體可以看doSelect邏輯。
EPollArrayWrapper
PollArrayWrapper
用於存放linux的epoll_event
結構。
typedef union epoll_data { void *ptr; int fd; __uint32_t u32; __uint64_t u64; } epoll_data_t; struct epoll_event { __uint32_t events; epoll_data_t data; };
創建EPoll文件描述符
在EPollArrayWrapper
創建時候會創建epoll文件描述符和epoll_event數組結構
EPollArrayWrapper() throws IOException { // creates the epoll file descriptor epfd = epollCreate(); // the epoll_event array passed to epoll_wait int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT; pollArray = new AllocatedNativeObject(allocationSize, true); pollArrayAddress = pollArray.address(); // eventHigh needed when using file descriptors > 64k if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE) eventsHigh = new HashMap<>(); } //最大不超過64K private static final int MAX_UPDATE_ARRAY_SIZE = AccessController.doPrivileged( new GetIntegerAction("sun.nio.ch.maxUpdateArraySize", Math.min(OPEN_MAX, 64*1024)));
EPollArrayWrapper
內部會為維護兩個結構,當句柄值小於MAX_UPDATE_ARRAY_SIZE
時會保存到數組結構中。否則會存儲到Map中。主要是優化效率。
private static final int MAX_UPDATE_ARRAY_SIZE = AccessController.doPrivileged( new GetIntegerAction("sun.nio.ch.maxUpdateArraySize", Math.min(OPEN_MAX, 64*1024)));
- 通過
epollCreate
方法創建epoll文件描述符,JNI調用底層的epoll_create
方法。傳入的參數位最大註冊的socket fd數量。
JNIEXPORT jint JNICALL Java_sun_nio_ch_EPoll_epollCreate(JNIEnv *env, jclass c) { /* * epoll_create expects a size as a hint to the kernel about how to * dimension internal structures. We can't predict the size in advance. */ int epfd = epoll_create(256); if (epfd < 0) { JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed"); } return epfd; }
epoll_create用於創建EPoll事件所需的內存空間,默認為256,在Linux 2.6.8以後,傳入的size就沒用了,底層會動態調整所需數據結構的大小。詳情可以看下epoll_create的方法描述
初始化epoll_event數組
epfd創建完後,創建epoll_event的數組,首先查詢epoll_event
結構的大小
private static final int SIZE_EPOLLEVENT = sizeofEPollEvent(); Java_sun_nio_ch_EPollArrayWrapper_sizeofEPollEvent(JNIEnv* env, jclass this) { return sizeof(struct epoll_event); }
查詢配置的文件描述符最大數量
private static final int OPEN_MAX = IOUtil.fdLimit(); private static final int NUM_EPOLLEVENTS = Math.min(OPEN_MAX, 8192); Java_sun_nio_ch_IOUtil_fdLimit(JNIEnv *env, jclass this) { struct rlimit rlp; if (getrlimit(RLIMIT_NOFILE, &rlp) < 0) { JNU_ThrowIOExceptionWithLastError(env, "getrlimit failed"); return -1; } if (rlp.rlim_max < 0 || rlp.rlim_max > java_lang_Integer_MAX_VALUE) { return java_lang_Integer_MAX_VALUE; } else { return (jint)rlp.rlim_max; } }
getrlimit
用於獲取資源使用限制,RLIMIT_NOFILE
獲取最大文件打開數量。對於getrlimit
詳細介紹可以看一下
Linux系統調用–getrlimit()與setrlimit()函數詳解
根據查詢到的epoll_event
結構大小和數量初始化數組大小。
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT; pollArray = new AllocatedNativeObject(allocationSize, true);
在 EPollArrayWrapper
內部使用 AllocatedNativeObject
對象創建的堆外(native)內存對象。
將數組的首地址保存到pollArrayAddress
中,在調用epollWait
的時候需要傳遞該參數給JNI。
和Windows的PollArrayWrapper
一樣,EPollArrayWrapper
也暴露了讀寫FD和Event的方法供EPollSelectorImpl
使用。
void putEventOps(int i, int event) { int offset = SIZE_EPOLLEVENT * i + EVENT_OFFSET; pollArray.putInt(offset, event); } void putDescriptor(int i, int fd) { int offset = SIZE_EPOLLEVENT * i + FD_OFFSET; pollArray.putInt(offset, fd); } int getEventOps(int i) { int offset = SIZE_EPOLLEVENT * i + EVENT_OFFSET; return pollArray.getInt(offset); } int getDescriptor(int i) { int offset = SIZE_EPOLLEVENT * i + FD_OFFSET; return pollArray.getInt(offset); }
註冊
protected void implRegister(SelectionKeyImpl ski) { if (closed) throw new ClosedSelectorException(); SelChImpl ch = ski.channel; //獲取通道的句柄 int fd = Integer.valueOf(ch.getFDVal()); //加入到緩存中 fdToKey.put(fd, ski); //加入到數組緩存 pollWrapper.add(fd); keys.add(ski); }
- 在註冊的時候會將
SelectionKey
加入到fdToKey
和keys
,同時會將文件描述符加入到pollWrapper
pollWrapper.add(fd); void add(int fd) { // force the initial update events to 0 as it may be KILLED by a // previous registration. synchronized (updateLock) { assert !registered.get(fd); //初始化事件掩碼為0 setUpdateEvents(fd, (byte)0, true); } } private void setUpdateEvents(int fd, byte events, boolean force) { //小於MAX_UPDATE_ARRAY_SIZE存到數組中 if (fd < MAX_UPDATE_ARRAY_SIZE) { if ((eventsLow[fd] != KILLED) || force) { eventsLow[fd] = events; } } else { //大於MAX_UPDATE_ARRAY_SIZE存到map中 Integer key = Integer.valueOf(fd); if (!isEventsHighKilled(key) || force) { eventsHigh.put(key, Byte.valueOf(events)); } } } private boolean isEventsHighKilled(Integer key) { assert key >= MAX_UPDATE_ARRAY_SIZE; Byte value = eventsHigh.get(key); return (value != null && value == KILLED); }
若文件描述符的值為KILLED
(-1)時,該管道被釋放。不再加入。如上面所述,這裡會根據key的大小存放到mapeventsHigh
或位元組數組eventsLow
中。
在調用poll
的時候才會調用epollCtl
進行註冊。
int poll(long timeout) throws IOException { //更新epoll事件,實際調用`epollCtl`加入到epollfd中 updateRegistrations(); ... } private void updateRegistrations() { synchronized (updateLock) { int j = 0; while (j < updateCount) { int fd = updateDescriptors[j]; short events = getUpdateEvents(fd); boolean isRegistered = registered.get(fd); int opcode = 0; if (events != KILLED) { //已經註冊過 if (isRegistered) { //修改或刪除 opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL; } else { //新增 opcode = (events != 0) ? EPOLL_CTL_ADD : 0; } if (opcode != 0) { epollCtl(epfd, opcode, fd, events); if (opcode == EPOLL_CTL_ADD) { //增加到registered緩存是否已註冊 registered.set(fd); } else if (opcode == EPOLL_CTL_DEL) { registered.clear(fd); } } } j++; } updateCount = 0; } } private byte getUpdateEvents(int fd) { if (fd < MAX_UPDATE_ARRAY_SIZE) { return eventsLow[fd]; } else { Byte result = eventsHigh.get(Integer.valueOf(fd)); // result should never be null return result.byteValue(); } }
doSelect
protected int doSelect(long timeout) throws IOException { if (closed) throw new ClosedSelectorException(); //1. 刪除取消的key processDeregisterQueue(); try { begin(); //2. 獲取就緒文件描述符 pollWrapper.poll(timeout); } finally { end(); } //3. 再次刪除取消的key processDeregisterQueue(); //4. 將就緒的key加入到selectedKeys中 int numKeysUpdated = updateSelectedKeys(); //5. 若管道被喚醒清理喚醒的數據 if (pollWrapper.interrupted()) { // Clear the wakeup pipe pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0); synchronized (interruptLock) { pollWrapper.clearInterrupted(); IOUtil.drain(fd0); interruptTriggered = false; } } return numKeysUpdated; }
- 刪除取消的key,當channel關閉時,對應的Key會被取消,被取消的key會加入到
cancelledKeys
中。調用processDeregisterQueue遍歷所有的key進行卸載。
processDeregisterQueue(); //遍歷所有已取消的key,取消他們 void processDeregisterQueue() throws IOException { // Precondition: Synchronized on this, keys, and selectedKeys Set<SelectionKey> cks = cancelledKeys(); //遍歷每個key調用卸載 implDereg(ski); } protected void implDereg(SelectionKeyImpl ski) throws IOException { assert (ski.getIndex() >= 0); SelChImpl ch = ski.channel; int fd = ch.getFDVal(); //根據文件句柄值移除 fdToKey.remove(Integer.valueOf(fd)); //從堆外內存溢出epoll_event結構 pollWrapper.remove(fd); ski.setIndex(-1); keys.remove(ski); selectedKeys.remove(ski); //將key設置為無效 deregister((AbstractSelectionKey)ski); SelectableChannel selch = ski.channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill(); }
從pollWrapper
移除,會將句柄值設置為KILLED
(-1)
pollWrapper.remove(fd); void remove(int fd) { synchronized (updateLock) { //設置實現值為-1 取消 setUpdateEvents(fd, KILLED, false); // remove from epoll if (registered.get(fd)) { //從epool對象中刪除 epollCtl(epfd, EPOLL_CTL_DEL, fd, 0); registered.clear(fd); } } }
EPOLL_CTL_DEL操作符將文件描述符從epoll fd中移除。
- 獲取就緒文件描述符
通過調用epollWait
方法,獲取到已就緒的文件描述符,存放在pollArrayAddress
地址中。
pollWrapper.poll(timeout); int poll(long timeout) throws IOException { //更新epoll事件,實際調用`epollCtl`加入到epollfd中 updateRegistrations(); //獲取已就緒的文件句柄 updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd); //如是喚醒文件句柄,則跳過,設置interrupted=true for (int i=0; i<updated; i++) { if (getDescriptor(i) == incomingInterruptFD) { interruptedIndex = i; interrupted = true; break; } } return updated; }
-
再次嘗試刪除取消的key。
epollWait
阻塞的時候可能會有channel被關閉,因此需要再次調用刪除取消key。 -
將就緒的key加入到selectedKeys中
private int updateSelectedKeys() { int entries = pollWrapper.updated; int numKeysUpdated = 0; for (int i=0; i<entries; i++) { int nextFD = pollWrapper.getDescriptor(i); SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD)); // ski is null in the case of an interrupt if (ski != null) { int rOps = pollWrapper.getEventOps(i); if (selectedKeys.contains(ski)) { if (ski.channel.translateAndSetReadyOps(rOps, ski)) { numKeysUpdated++; } } else { ski.channel.translateAndSetReadyOps(rOps, ski); if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) { //加入到selectedKeys中 selectedKeys.add(ski); numKeysUpdated++; } } } } return numKeysUpdated; }
- 當
epollWait
是被管道喚醒時,則將管道數據都讀取出來以清除管道數據
EPoll有水平喚醒觸發和邊緣觸發兩種觸發模式,水平觸發有數據可讀,若不讀取完,下次調用poll時會一致被喚醒。而邊緣觸發則觸發一次後不處理,下次除非有新的事件到來否則不會再喚醒。邊緣觸發性能更好。這裡必須將管道數據全部讀取完才行,避免設置為水平觸發時管道一值喚醒。
當線程中斷時, 會調用wakeup
喚醒,向管道中寫入一個位元組數據使其讀事件就緒被喚醒。在前面的文章提到過線程中斷接口。
public Selector wakeup() { synchronized (interruptLock) { if (!interruptTriggered) { pollWrapper.interrupt(); interruptTriggered = true; } } return this; } JNIEXPORT void JNICALL Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd) { int fakebuf[1]; fakebuf[0] = 1; if (write(fd, fakebuf, 1) < 0) { JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed"); } }
清理喚醒管道數據,將數據讀出來。
IOUtil.drain(fd0); JNIEXPORT jboolean JNICALL Java_sun_nio_ch_IOUtil_drain(JNIEnv *env, jclass cl, jint fd) { char buf[128]; int tn = 0; for (;;) { int n = read(fd, buf, sizeof(buf)); tn += n; if ((n < 0) && (errno != EAGAIN)) JNU_ThrowIOExceptionWithLastError(env, "Drain"); if (n == (int)sizeof(buf)) continue; return (tn > 0) ? JNI_TRUE : JNI_FALSE; } }
關閉EpollSelectorImpl
關閉EpollSelectorImpl時會將所有註冊的通道一同關閉
protected void implClose() throws IOException { if (closed) return; closed = true; // prevent further wakeup synchronized (interruptLock) { interruptTriggered = true; } //關閉管道文件描述符 FileDispatcherImpl.closeIntFD(fd0); FileDispatcherImpl.closeIntFD(fd1); //關閉epoll fd,並釋放堆外內存。 pollWrapper.closeEPollFD(); // it is possible selectedKeys = null; // 情理所有通道 Iterator<SelectionKey> i = keys.iterator(); while (i.hasNext()) { SelectionKeyImpl ski = (SelectionKeyImpl)i.next(); SelectableChannel selch = ski.channel(); if (!selch.isOpen() && !selch.isRegistered()) ((SelChImpl)selch).kill(); i.remove(); } fd0 = -1; fd1 = -1; } pollWrapper.closeEPollFD(); void closeEPollFD() throws IOException { //關閉epfd FileDispatcherImpl.closeIntFD(epfd); //釋放堆外內存 pollArray.free(); }
總結
本文對EPollSelectorImpl
的代碼實現進行詳細解析。相比WindowsSelectorImpl
的select模型而言,因為沒有最大文件描述符的限制,因此也無需調用poll多次。通過簡單的調用JNI方法輕易的實現了高性能的I/O模型。
至此,本系列NIO源碼分析章節已經結束。通過9篇文章對NIO的各個塊的源碼進行分析,為後續對Netty的源碼分析打下基礎。
相關文獻
- epoll原理詳解及epoll反應堆模型
- 徹底理解epoll
- Linux系統調用–getrlimit()與setrlimit()函數詳解
- Linux管道 – 系統調用pipe()函數實現
- epoll_create
- Epoll – 水平觸發和邊緣觸發
微信掃一掃二維碼關注訂閱號傑哥技術分享
出處:https://www.cnblogs.com/Jack-Blog/p/12394487.html
作者:傑哥很忙
本文使用「CC BY 4.0」創作共享協議。歡迎轉載,請在明顯位置給出出處及鏈接。