NIO-EPollSelectorIpml源碼分析



NIO-EPollSelectorIpml源碼分析

目錄

NIO-概覽
NIO-Buffer
NIO-Channel
NIO-Channel接口分析
NIO-SocketChannel源碼分析
NIO-FileChannel源碼分析
NIO-Selector源碼分析
NIO-WindowsSelectorImpl源碼分析
NIO-EPollSelectorIpml源碼分析

前言

本來是想學習Netty的,但是Netty是一個NIO框架,因此在學習netty之前,還是先梳理一下NIO的知識。通過剖析源碼理解NIO的設計原理。

本系列文章針對的是JDK1.8.0.161的源碼。

NIO-Selector源碼分析Selector的功能和創建過程進行了分析,本篇對Linux環境下JDK實現的EPollSelectorImpl源碼進行詳細講解。

本篇文章不會對EPoll算法進行詳細介紹,對epoll算法感興趣或還不了解的同學可以看epoll原理詳解及epoll反應堆模型先進行學習。

在詳細介紹EpollSelectorProvider之前我們先了解一下EPoll主要的三個步驟:

  1. 調用epoll_create建立一個epoll 對象(在epoll文件系統中給這個句柄分配資源);
  2. 調用epoll_ctl向epoll對象中添加或刪除文件句柄及監控事件。
  3. 調用epoll_wait收集發生事件的文件描述符。

初始化EPollSelectorProvider

NIO-Selector源碼分析提到,若沒有進行配置時,默認通過sun.nio.ch.DefaultSelectorProvider.create()創建SelectorProvider。Linux下的代碼路徑在jdksrcsolarisclassessunniochDefaultSelectorProvider.java。在其內部通過實際是創建了一個EPollSelectorProvider

創建EPollSelectorImpl

EPollSelectorProvider是用於創建EPollSelectorImpl的。

Selector.Open()->  SelectorProvider.provider()->  sun.nio.ch.DefaultSelectorProvider.create()->  EPollSelectorProvider.openSelector()->  new EPollSelectorImpl(this)  public class EPollSelectorProvider extends SelectorProviderImpl  {      public AbstractSelector openSelector() throws IOException {          return new EPollSelectorImpl(this);      }        public Channel inheritedChannel() throws IOException {          return InheritedChannel.getChannel();      }  }

inheritedChannel()可以返回系統默認SelectorProvider創建的通道,主要有些操作系統底層需要調用默認的通道。

EPollSelectorImpl結構

在詳細講解EPollSelectorImpl源碼之前,先了解EPollSelectorImpl的主要的數據結構和屬性。

名稱 作用
Map<Integer,SelectionKeyImpl> fdToKey 保存文件描述符句柄和的SelectionKey的映射關係
int fd0 管道的讀端文件描述符
int fd1 管道的寫端文件描述符
EPollArrayWrapper pollWrapper 調用底層Epoll算法的包裝類
   EPollSelectorImpl(SelectorProvider sp) throws IOException {      super(sp);      long pipeFds = IOUtil.makePipe(false);      fd0 = (int) (pipeFds >>> 32); //無符號移位      fd1 = (int) pipeFds;      pollWrapper = new EPollArrayWrapper();      pollWrapper.initInterrupt(fd0, fd1);      fdToKey = new HashMap<>();  }  void initInterrupt(int fd0, int fd1) {      outgoingInterruptFD = fd1;      incomingInterruptFD = fd0;      //將管道的讀取端註冊      epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);  }

pipeFds高32位存放的是通道read端的文件描述符FD,低32位存放的是write端的文件描述符。這裡做移位處理。

通過調用JNI的makePipe方法創建單向管道。

JNIEXPORT jlong JNICALL  Java_sun_nio_ch_IOUtil_makePipe(JNIEnv *env, jobject this, jboolean blocking)  {      int fd[2];        if (pipe(fd) < 0) {          JNU_ThrowIOExceptionWithLastError(env, "Pipe failed");          return 0;      }      if (blocking == JNI_FALSE) {          //配置阻塞          if ((configureBlocking(fd[0], JNI_FALSE) < 0)              || (configureBlocking(fd[1], JNI_FALSE) < 0)) {              JNU_ThrowIOExceptionWithLastError(env, "Configure blocking failed");              close(fd[0]);              close(fd[1]);              return 0;          }      }      //高32位存讀端,低32位存寫端      return ((jlong) fd[0] << 32) | (jlong) fd[1];  }  

JNI內部則通過pipe創建管道。

對於管道的詳細邏輯可以看《Linux管道 – 系統調用pipe()函數實現》

fdToKey

在註冊時會將文件描述符的句柄和對應的SelectionKey保存到Map<Integer,SelectionKeyImpl> fdToKey

管道文件描述符

在EPollSelectorImpl創建的時候會使用IOUtil.makePipe(false)調用創建一個管道,用於喚醒線程用。當線程中斷時通過向寫管道寫入一個位元組來喚醒線程,具體可以看doSelect邏輯。

EPollArrayWrapper

PollArrayWrapper用於存放linux的epoll_event結構。

 typedef union epoll_data {       void *ptr;       int fd;       __uint32_t u32;       __uint64_t u64;    } epoll_data_t;     struct epoll_event {       __uint32_t events;       epoll_data_t data;   };

創建EPoll文件描述符

EPollArrayWrapper創建時候會創建epoll文件描述符和epoll_event數組結構

EPollArrayWrapper() throws IOException {      // creates the epoll file descriptor      epfd = epollCreate();        // the epoll_event array passed to epoll_wait      int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;      pollArray = new AllocatedNativeObject(allocationSize, true);      pollArrayAddress = pollArray.address();        // eventHigh needed when using file descriptors > 64k      if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)          eventsHigh = new HashMap<>();  }  //最大不超過64K  private static final int MAX_UPDATE_ARRAY_SIZE = AccessController.doPrivileged(          new GetIntegerAction("sun.nio.ch.maxUpdateArraySize", Math.min(OPEN_MAX, 64*1024)));  

EPollArrayWrapper內部會為維護兩個結構,當句柄值小於MAX_UPDATE_ARRAY_SIZE時會保存到數組結構中。否則會存儲到Map中。主要是優化效率。

private static final int MAX_UPDATE_ARRAY_SIZE = AccessController.doPrivileged(          new GetIntegerAction("sun.nio.ch.maxUpdateArraySize", Math.min(OPEN_MAX, 64*1024)));  
  • 通過epollCreate方法創建epoll文件描述符,JNI調用底層的epoll_create方法。傳入的參數位最大註冊的socket fd數量。
JNIEXPORT jint JNICALL  Java_sun_nio_ch_EPoll_epollCreate(JNIEnv *env, jclass c) {      /*       * epoll_create expects a size as a hint to the kernel about how to       * dimension internal structures. We can't predict the size in advance.       */      int epfd = epoll_create(256);      if (epfd < 0) {         JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");      }      return epfd;  }

epoll_create用於創建EPoll事件所需的內存空間,默認為256,在Linux 2.6.8以後,傳入的size就沒用了,底層會動態調整所需數據結構的大小。詳情可以看下epoll_create的方法描述

初始化epoll_event數組

epfd創建完後,創建epoll_event的數組,首先查詢epoll_event結構的大小

private static final int SIZE_EPOLLEVENT  = sizeofEPollEvent();  Java_sun_nio_ch_EPollArrayWrapper_sizeofEPollEvent(JNIEnv* env, jclass this)  {      return sizeof(struct epoll_event);  }

查詢配置的文件描述符最大數量

private static final int OPEN_MAX         = IOUtil.fdLimit();  private static final int NUM_EPOLLEVENTS  = Math.min(OPEN_MAX, 8192);  Java_sun_nio_ch_IOUtil_fdLimit(JNIEnv *env, jclass this)  {      struct rlimit rlp;      if (getrlimit(RLIMIT_NOFILE, &rlp) < 0) {          JNU_ThrowIOExceptionWithLastError(env, "getrlimit failed");          return -1;      }      if (rlp.rlim_max < 0 || rlp.rlim_max > java_lang_Integer_MAX_VALUE) {          return java_lang_Integer_MAX_VALUE;      } else {          return (jint)rlp.rlim_max;      }  }

getrlimit用於獲取資源使用限制,RLIMIT_NOFILE獲取最大文件打開數量。對於getrlimit詳細介紹可以看一下
Linux系統調用–getrlimit()與setrlimit()函數詳解

根據查詢到的epoll_event結構大小和數量初始化數組大小。

int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;  pollArray = new AllocatedNativeObject(allocationSize, true);

EPollArrayWrapper 內部使用 AllocatedNativeObject對象創建的堆外(native)內存對象。
將數組的首地址保存到pollArrayAddress中,在調用epollWait的時候需要傳遞該參數給JNI

和Windows的PollArrayWrapper一樣,EPollArrayWrapper也暴露了讀寫FD和Event的方法供EPollSelectorImpl使用。

void putEventOps(int i, int event) {      int offset = SIZE_EPOLLEVENT * i + EVENT_OFFSET;      pollArray.putInt(offset, event);  }    void putDescriptor(int i, int fd) {      int offset = SIZE_EPOLLEVENT * i + FD_OFFSET;      pollArray.putInt(offset, fd);  }    int getEventOps(int i) {      int offset = SIZE_EPOLLEVENT * i + EVENT_OFFSET;      return pollArray.getInt(offset);  }    int getDescriptor(int i) {      int offset = SIZE_EPOLLEVENT * i + FD_OFFSET;      return pollArray.getInt(offset);  }

註冊

protected void implRegister(SelectionKeyImpl ski) {      if (closed)          throw new ClosedSelectorException();      SelChImpl ch = ski.channel;      //獲取通道的句柄      int fd = Integer.valueOf(ch.getFDVal());      //加入到緩存中      fdToKey.put(fd, ski);      //加入到數組緩存      pollWrapper.add(fd);      keys.add(ski);  }
  • 在註冊的時候會將SelectionKey加入到fdToKeykeys,同時會將文件描述符加入到pollWrapper
pollWrapper.add(fd);  void add(int fd) {      // force the initial update events to 0 as it may be KILLED by a      // previous registration.      synchronized (updateLock) {          assert !registered.get(fd);          //初始化事件掩碼為0          setUpdateEvents(fd, (byte)0, true);      }  }  private void setUpdateEvents(int fd, byte events, boolean force) {      //小於MAX_UPDATE_ARRAY_SIZE存到數組中      if (fd < MAX_UPDATE_ARRAY_SIZE) {          if ((eventsLow[fd] != KILLED) || force) {              eventsLow[fd] = events;          }      } else {          //大於MAX_UPDATE_ARRAY_SIZE存到map中          Integer key = Integer.valueOf(fd);          if (!isEventsHighKilled(key) || force) {              eventsHigh.put(key, Byte.valueOf(events));          }      }  }  private boolean isEventsHighKilled(Integer key) {      assert key >= MAX_UPDATE_ARRAY_SIZE;      Byte value = eventsHigh.get(key);      return (value != null && value == KILLED);  }  

若文件描述符的值為KILLED(-1)時,該管道被釋放。不再加入。如上面所述,這裡會根據key的大小存放到mapeventsHigh或位元組數組eventsLow中。

在調用poll的時候才會調用epollCtl進行註冊。

int poll(long timeout) throws IOException {      //更新epoll事件,實際調用`epollCtl`加入到epollfd中      updateRegistrations();      ...  }  private void updateRegistrations() {      synchronized (updateLock) {          int j = 0;          while (j < updateCount) {              int fd = updateDescriptors[j];              short events = getUpdateEvents(fd);              boolean isRegistered = registered.get(fd);              int opcode = 0;                if (events != KILLED) {                  //已經註冊過                  if (isRegistered) {                      //修改或刪除                      opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;                  } else {                      //新增                      opcode = (events != 0) ? EPOLL_CTL_ADD : 0;                  }                  if (opcode != 0) {                      epollCtl(epfd, opcode, fd, events);                      if (opcode == EPOLL_CTL_ADD) {                          //增加到registered緩存是否已註冊                          registered.set(fd);                      } else if (opcode == EPOLL_CTL_DEL) {                          registered.clear(fd);                      }                  }              }              j++;          }          updateCount = 0;      }  }  private byte getUpdateEvents(int fd) {      if (fd < MAX_UPDATE_ARRAY_SIZE) {          return eventsLow[fd];      } else {          Byte result = eventsHigh.get(Integer.valueOf(fd));          // result should never be null          return result.byteValue();      }  }

doSelect

  protected int doSelect(long timeout) throws IOException {      if (closed)          throw new ClosedSelectorException();      //1. 刪除取消的key      processDeregisterQueue();      try {          begin();          //2. 獲取就緒文件描述符          pollWrapper.poll(timeout);      } finally {          end();      }      //3. 再次刪除取消的key      processDeregisterQueue();      //4. 將就緒的key加入到selectedKeys中      int numKeysUpdated = updateSelectedKeys();      //5. 若管道被喚醒清理喚醒的數據      if (pollWrapper.interrupted()) {          // Clear the wakeup pipe          pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);          synchronized (interruptLock) {              pollWrapper.clearInterrupted();              IOUtil.drain(fd0);              interruptTriggered = false;          }      }      return numKeysUpdated;  }
  • 刪除取消的key,當channel關閉時,對應的Key會被取消,被取消的key會加入到cancelledKeys中。調用processDeregisterQueue遍歷所有的key進行卸載。
  processDeregisterQueue();  //遍歷所有已取消的key,取消他們  void processDeregisterQueue() throws IOException {      // Precondition: Synchronized on this, keys, and selectedKeys      Set<SelectionKey> cks = cancelledKeys();      //遍歷每個key調用卸載      implDereg(ski);  }  protected void implDereg(SelectionKeyImpl ski) throws IOException {      assert (ski.getIndex() >= 0);      SelChImpl ch = ski.channel;      int fd = ch.getFDVal();      //根據文件句柄值移除      fdToKey.remove(Integer.valueOf(fd));      //從堆外內存溢出epoll_event結構      pollWrapper.remove(fd);      ski.setIndex(-1);      keys.remove(ski);      selectedKeys.remove(ski);      //將key設置為無效      deregister((AbstractSelectionKey)ski);      SelectableChannel selch = ski.channel();      if (!selch.isOpen() && !selch.isRegistered())          ((SelChImpl)selch).kill();  } 

pollWrapper移除,會將句柄值設置為KILLED(-1)

pollWrapper.remove(fd);  void remove(int fd) {          synchronized (updateLock) {              //設置實現值為-1 取消              setUpdateEvents(fd, KILLED, false);              // remove from epoll              if (registered.get(fd)) {                  //從epool對象中刪除                  epollCtl(epfd, EPOLL_CTL_DEL, fd, 0);                  registered.clear(fd);              }          }      }

EPOLL_CTL_DEL操作符將文件描述符從epoll fd中移除。

  • 獲取就緒文件描述符

通過調用epollWait方法,獲取到已就緒的文件描述符,存放在pollArrayAddress地址中。

pollWrapper.poll(timeout);  int poll(long timeout) throws IOException {      //更新epoll事件,實際調用`epollCtl`加入到epollfd中      updateRegistrations();      //獲取已就緒的文件句柄      updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);      //如是喚醒文件句柄,則跳過,設置interrupted=true      for (int i=0; i<updated; i++) {          if (getDescriptor(i) == incomingInterruptFD) {              interruptedIndex = i;              interrupted = true;              break;          }      }      return updated;  }
  • 再次嘗試刪除取消的key。epollWait阻塞的時候可能會有channel被關閉,因此需要再次調用刪除取消key。

  • 將就緒的key加入到selectedKeys中

private int updateSelectedKeys() {          int entries = pollWrapper.updated;          int numKeysUpdated = 0;          for (int i=0; i<entries; i++) {              int nextFD = pollWrapper.getDescriptor(i);              SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));              // ski is null in the case of an interrupt              if (ski != null) {                  int rOps = pollWrapper.getEventOps(i);                  if (selectedKeys.contains(ski)) {                      if (ski.channel.translateAndSetReadyOps(rOps, ski)) {                          numKeysUpdated++;                      }                  } else {                      ski.channel.translateAndSetReadyOps(rOps, ski);                      if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {                          //加入到selectedKeys中                          selectedKeys.add(ski);                          numKeysUpdated++;                      }                  }              }          }          return numKeysUpdated;      }
  • epollWait是被管道喚醒時,則將管道數據都讀取出來以清除管道數據

EPoll有水平喚醒觸發和邊緣觸發兩種觸發模式,水平觸發有數據可讀,若不讀取完,下次調用poll時會一致被喚醒。而邊緣觸發則觸發一次後不處理,下次除非有新的事件到來否則不會再喚醒。邊緣觸發性能更好。這裡必須將管道數據全部讀取完才行,避免設置為水平觸發時管道一值喚醒。

當線程中斷時, 會調用wakeup喚醒,向管道中寫入一個位元組數據使其讀事件就緒被喚醒。在前面的文章提到過線程中斷接口

public Selector wakeup() {      synchronized (interruptLock) {          if (!interruptTriggered) {              pollWrapper.interrupt();              interruptTriggered = true;          }      }      return this;  }  JNIEXPORT void JNICALL  Java_sun_nio_ch_EPollArrayWrapper_interrupt(JNIEnv *env, jobject this, jint fd)  {      int fakebuf[1];      fakebuf[0] = 1;      if (write(fd, fakebuf, 1) < 0) {          JNU_ThrowIOExceptionWithLastError(env,"write to interrupt fd failed");      }  }

清理喚醒管道數據,將數據讀出來。

IOUtil.drain(fd0);  JNIEXPORT jboolean JNICALL  Java_sun_nio_ch_IOUtil_drain(JNIEnv *env, jclass cl, jint fd)  {      char buf[128];      int tn = 0;        for (;;) {          int n = read(fd, buf, sizeof(buf));          tn += n;          if ((n < 0) && (errno != EAGAIN))              JNU_ThrowIOExceptionWithLastError(env, "Drain");          if (n == (int)sizeof(buf))              continue;          return (tn > 0) ? JNI_TRUE : JNI_FALSE;      }  }  

關閉EpollSelectorImpl

關閉EpollSelectorImpl時會將所有註冊的通道一同關閉

      protected void implClose() throws IOException {          if (closed)              return;          closed = true;            // prevent further wakeup          synchronized (interruptLock) {              interruptTriggered = true;          }          //關閉管道文件描述符          FileDispatcherImpl.closeIntFD(fd0);          FileDispatcherImpl.closeIntFD(fd1);          //關閉epoll fd,並釋放堆外內存。          pollWrapper.closeEPollFD();          // it is possible          selectedKeys = null;            // 情理所有通道          Iterator<SelectionKey> i = keys.iterator();          while (i.hasNext()) {              SelectionKeyImpl ski = (SelectionKeyImpl)i.next();              SelectableChannel selch = ski.channel();              if (!selch.isOpen() && !selch.isRegistered())                  ((SelChImpl)selch).kill();              i.remove();          }            fd0 = -1;          fd1 = -1;      }  pollWrapper.closeEPollFD();  void closeEPollFD() throws IOException {      //關閉epfd      FileDispatcherImpl.closeIntFD(epfd);      //釋放堆外內存      pollArray.free();  }

總結

本文對EPollSelectorImpl的代碼實現進行詳細解析。相比WindowsSelectorImpl的select模型而言,因為沒有最大文件描述符的限制,因此也無需調用poll多次。通過簡單的調用JNI方法輕易的實現了高性能的I/O模型。

至此,本系列NIO源碼分析章節已經結束。通過9篇文章對NIO的各個塊的源碼進行分析,為後續對Netty的源碼分析打下基礎。

相關文獻

  1. epoll原理詳解及epoll反應堆模型
  2. 徹底理解epoll
  3. Linux系統調用–getrlimit()與setrlimit()函數詳解
  4. Linux管道 – 系統調用pipe()函數實現
  5. epoll_create
  6. Epoll – 水平觸發和邊緣觸發

20191127212134.png
微信掃一掃二維碼關注訂閱號傑哥技術分享
出處:https://www.cnblogs.com/Jack-Blog/p/12394487.html
作者:傑哥很忙
本文使用「CC BY 4.0」創作共享協議。歡迎轉載,請在明顯位置給出出處及鏈接。