浅谈NIO
- 2019 年 10 月 27 日
- 筆記
浅谈NIO
说到NIO大家都不会陌生,它是JDK中提供的IO工具集。 它又被称作为New I/O
或Non Blocking I/O
。相较于传统面向流的java.io
,nio是完全面向缓冲的I/O,它提供了更底层的操作。
如果你了解C语言那么你一定接触过标准I/O库(stdio.h
),其中实现的函数都是面向流的,而操作系统底层I/O函数(系统调用)都是基于缓冲去实现的(感兴趣可以参阅一些标准库的实现,例如glibc),标准I/O是在其上实现的高级API,它能对每个流的缓冲自动地进行管理,屏蔽掉了我们在管理缓冲上的复杂。实际上流也可以看作是一种很特殊的“缓冲”,比如将流看作一段段连续地缓冲块。
基于流的I/O是独立于操作系统的设计,它依赖操作系统底层的I/O模型,也带入了I/O阻塞的问题。值得一提的是它的出现比socket要早很多年,或者说在此之前I/O的阻塞可能都不构成一个问题。为什么这样说呢?我们先来讨论下什么是阻塞。
阻塞IO
我们都知道,CPU从寄存器中读取是最快的,其次CPU上的缓存,而读取磁盘相对来说是非常慢的。拿3.3GHz主频CPU为例,它的时钟周期为3ns
,假设固态硬盘一次顺序读取需要50μs
,那就是相当于16.6万个时钟周期,如果换算成秒的话CPU需要等待2天才有数据进入。
下图出自《Systems Performance》,表中列出一些事件的延迟时间。


所以调用一些直接与设备交互的函数(系统调用
或者系统函数
)CPU都会产生空闲,这种空闲会“阻塞”住进程。实际上大多数设备的操作,比如从硬盘上的文件系统中读取一个文本,除非是遇到硬件错误,它们都是能很快返回的。我们常常讨论的阻塞问题都是一些低速设备,例如网卡
、终端
和通道
等,它们大多是一些被动的IO,如果不能从对端读取到数据很可能就一直阻塞下去。
阻塞类似于一个长时间的睡眠
,在阻塞发生时进程处于一种假死
状态,这时进程除了能被信号中断外CPU
将不会继续往下执行指令。
考虑有以下服务器程序,serverSocket
为服务器套接字实例,readMsg
函数负责读取客户端套接字的逻辑。
while (true) { Socket socket = serverSocket.accpet(); readMsg(socket); }
我们可以使用telnet
连接上这个服务,但如果我们什么都不输入,进程将被一直阻塞在readMsg
函数。严重的是,我们的程序会成为了一对一
的服务器程序。如果此时的连接不断开,其他用户试图连入时也会被阻塞住,这样的用户体验是非常糟糕的。
看到CPU
会被读取函数阻塞住,可能有人就会想到现代CPU
都是多核架构,我们可以使用其它核去建立一个新连接就解决了。确实,可以使用多线程去改进它,考虑到创建线程的开销我们会用到线程池。
while (true) { Socket socket = serverSocket.accpet(); threadPool.submit(() -> readMsg(socket)); }
这样,我们就可以在新的线程中处理连接了,仅实现一个简单的web
服务器也能有比较不错的性能。但是资源始终是有限的,如果处理请求的函数都是需要长时间等待的又或者根本就是恶意的连接,它们还是会占满所有的资源,后续连接依然会被阻塞。
在套接字(socket
)的实现中,提供了相关的选项可以让发送
端或接收
端超时。它能让socket
在超过指定时间没有收到响应就返回一个错误而不是一直阻塞。JDK
的Socket API
也提供一个方法给套接字设置超时时间 – setSoTimeout(int)
,如果函数超出指定时间没有返回,那么将会抛出一个SocketTimeoutException
,经过修改我们得到以下的加强版。
serverSocket.setSoTimeout(200); while (true) { Socket socket = null; try { socket = serverSocket.accpet(); } catch (SocketTimeoutException e) { continue; } threadPool.submit(() -> { socket.setSoTimeout(50); readMsg(socket); }); }
通过设置超时时间得到一种非阻塞的假象,吞吐量得到稍微改善,但是依然没有本质上解决阻塞问题。
接下来我们来试试用NIO来解决阻塞
带来的问题。
NIO
在JDK标准实现中,NIO提供了与传统IO完全不同的API来完成同样的事,它也提供了更多、更复杂的IO模型。NIO主要包括四大基本组件 – 通道(Channel)
、选择器(Selector)
、缓冲(Buffer)
和字符集(Charsets)
,本文会简单介绍前三个。
通道(Channel)
Nio为打开的IO设备提供了不同的抽象 – Channel
,要用nio操纵一个设备需要一个Channel
对象。
关于Channel
的能力可以参阅官方文档中java.nio.channels
包下的接口介绍。需要提一下的是ByteChannel
,它实现了ReadableByteChannel
和 WritableByteChannel
,也就是说它同时具备读和写的能力,这是有别于流
的设计,因为大多数流得实现都只具备输入或输出中的一种能力。当然,从命名上已经能看出区别了,现实中的流
是单向的,而通道
可以是双向的。在NIO中操纵套接字的SocketChannel
也实现了ByteChannel
,所以我们可以直接使用它读写套接字。
ServerSocketChannel
和SocketChannel
这两个抽象类分别作为服务器套接字通道和客户端套接字通道的抽象,他们都继承了SelectableChannel
,这关系着套接字通道的另外两项非常重要的能力 – 非阻塞I/O
和多路复用
(multiplexing
)。 我们先讨论下非阻塞I/O
,它提供了一个configureBlocking(blooean)
方法,它用于设置套接字操作是否阻塞。这意味着当在打开的ServerSocket
或Socket
上设置非阻塞
,之前会被阻塞的地方都能立即返回,例如,在ServerSocket
使用accept
,没有请求时会立即返回一个null
。
值得一提的是,ServerSocketChannel
和SocketChannel
具体实现并没有包含在java.*
包中而是在sun.nio.*
,这部分的源码在Oracle提供的JDK中并没有公开。 感兴趣的同学可以去OpenJDK的源码中参照实现。
下面是我们使用NIO创建的一个服务端程序:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(PORT)); serverSocketChannel.configureBlocking(false); while (true) { SocketChannel socketChannel = null; while ((socketChannel = serverSocketChannel.accept()) == null) { // 没有请求进入时 } socketChannel.configureBlocking(false); socketChannel.read(buff); }
我这里是使用一个轮询(polling
),这样的缺点是如果没有连接进入时,CPU
就会不停的执行指令,通过top
等工具我们也能很清楚的看到这时候CPU
的负荷很高。由于大多数的服务器程序都不是计算密集型的,我们需要适当的让CPU
空闲而不是一直去执行无意义的指令。这种模型是不适合直接在实际应用中使用的,一般它都会和多路复用
模型搭配使用。
多路复用(Selector)
其实操作系统设计者们早就考虑到阻塞式I/O
带的一些问题,所以在很早的时候操作系统中就有一种multiplexing
的I/O模型。早在1983年发布的BSD4.2
中就引入了系统调用select
,值得一提的是这个版本还首次引入了socket
套接字API,很难不让人联想它的存在就是为了解决套接字这类“低速”设备上的阻塞I/O问题的。
在Unix标准实现中提供的select
和poll
系统调用都可以说否是多路复用的实现,它们提供了让内核通知进程I/O已准备就绪
的能力,这样我们就能串行操纵多个打开I/O设备
。
比较有意思的是某些Unix操作系统中,比如Linux
和BSD
中都包含一些非标准的实现,它们具备更优的性能,感兴趣的同学可以参照epoll_create(2)
和kqueue(2)
。
在Jdk的实现中,java.nio
提供一个名为Selector
的抽象类,从名字可以看出它是具备类似于select
系统调用的功能。与SocketChannel
同样它的实现也在sun.nio.*
包中,它会根据操作系统提供不同的实现,例如,在linux
会使用epoll
而BSD
中会使用kqueue
,从而提供更好的性能。
我们可以直接使用它提供的静态工厂方法即可创建selector
:
Selector selector = Selector.open();
既然是要操作系统内核去通知进程,那自然需要有对应事件的处理。做过awt
或者web
的都应该清楚,如果要处理某一种事件(例如,点击一个表单上的按钮),就需要注册对应事件到事件监听器上。Selector
也同样,我们需要对创建的套接字注册监听事件。
前文提到SocketChannel
所继承的SelectableChannel
是为套接字channel提供多路复用能力。
通过文档我们在SelectableChannel
中找到一个SelectionKey register(Selector sel, int ops)
方法。注意,这里要注册一个事件到Selector
是使用SelectableChannel
的能力。
- 返回值
SelectionKey
代表着一个已经被注册到Selector
中的SelectableChannel
实例。 sel
自然是需要我们传入上面创建的selector
。ops
代表需要监听的事件, 它们被定义在中SelectionKey
中,例如需要监听accept
事件只需要调用register(selector, SelectionKey.OP_ACCEPT)
,也可以传入多个事件,需要用或运算符|
串起来SelectionKey.OP_READ | SelectionKey.OP_WRITE
,这是*nix
系统函数的惯用传参法。
接下来使用Selector
中的int select()
方法,它将返回到来事件的个数。这个方法将会一直阻塞到有事件发生,所以一般会使用另一个带long参数的版本int select(long timeout)
。它接收一个超时时间,当阻塞超时方法就会立即返回0
。
下面来演示一个多路复用服务端的实现:
serverSocketChannel.configureBlocking(false); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while (true ) { if (selector.select(TIME_OUT) == 0) { //... continue; } //... }
当有事件到来时,例如有客户端连入,select()
方法的返回事件个数使得程序跳出if
分支。
这时候调用Selector.selectedKeys()
会返回一个SelectionKey
集合,代表着需要处理的事件。
SelectionKey
中定义了一系列boolean is*()
方法对了应事件类型,这样我们就可以根据事件类型定义不同的操作
while (true) { ... Set<SelectionKey> set = selector.selectedKeys(); set.forEach((selectionKey) -> { if (selectionKey.isAcceptable()) { try { SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(16)); } catch (IOException e) { // } } if (selectionKey.isReadable()) { SocketChannel socketChannel = selectionKey.channel(); ByteBuffer buf = (ByteBuffer) selectionKey.attachment(); readMsg(socketChannel, buff); } ... }); // 使用完毕需要清空 set.clear(); }
在处理accept
事件时我们可以把建立的客户端套接字也注册到Selector
上,它使用SelectionKey.selector()
获取与之关联的Selector
。这里使用了之前register(Selector sel, int ops)
的一个重载版本, 它在原本基础上还需要额外的第三个参数 – 一个Object
数据,代表一个与SelectorKey
相关联的附件。这样我们在下次需要处理读取
或写入
事件时直接取出它即可,节省了每次创建缓冲的开销。
使用多路复用我们就可以在串行中处理很多个连接,当然这也不是没有上限,这个取决于操作系统中文件描述符
的上限,虽然说这个值也是可以修改的-_-
缓冲(Buffer)
对于操作系统来说,I/O
是一个非常昂贵的操作,比如在磁盘中读取,相较于随机访问内存(RAM
)磁盘的访问是非常缓慢的过程。所以,操作系统的设计中有很多机制去优化读和写,例如,在读取一个文件时会事先将部分读入内存中,这个步骤一般由内核来完成的。内核的代码运行在内核空间
,通常内核在读取数据时会事先将数据读入在内核空间
的缓冲区中。
我们知道用户所编写的代码是运行在用户空间
,是不能直接访问内核空间
的,内核提供一种名为系统调用(syscall)
的接口去完成内核空间
的访问。
内核空间的执行代码的代价也是相当昂贵的,因为稍有不慎就可能导致系统崩溃,所以内核中在系统调用
的实现中有很多的检查机制。大量使用系统调用会对我们的程序性能大打折扣,所以我们会考虑减少这类函数的访问。比如在I/O
系统调用中使用一个内存缓冲区
比如数组
,这样可以避免每读取一个字节都调用一次系统函数,所以用户缓冲区
的大小关联着程序的性能,当然性能也不是线性增长。
下表来自《The Linux Programming Interface》,作者对100m文本读取时使用不同大小缓冲区的测试结果:
BUF_SIZE |
Elapsed |
Total CPU |
User CPU |
System CPU |
---|---|---|---|---|
1 |
107.43 |
107.32 |
8.20 |
99.12 |
2 |
54.16 |
53.89 |
4.13 |
49.76 |
4 |
31.72 |
30.96 |
2.30 |
28.66 |
8 |
15.59 |
14.34 |
1.08 |
13.26 |
16 |
7.50 |
7.14 |
0.51 |
6.63 |
32 |
3.76 |
3.68 |
0.26 |
3.41 |
64 |
2.19 |
2.04 |
0.13 |
1.91 |
128 |
2.16 |
1.59 |
0.11 |
1.48 |
256 |
2.06 |
1.75 |
0.10 |
1.65 |
512 |
2.06 |
1.03 |
0.05 |
0.98 |
1024 |
2.05 |
0.65 |
0.02 |
0.63 |
4096 |
2.05 |
0.38 |
0.01 |
0.38 |
16384 |
2.05 |
0.34 |
0.00 |
0.33 |
65536 |
2.06 |
0.32 |
0.00 |
0.32 |
这也就是为什么很多编程语言中的IO
处理函数都需要提供一个数组
结构,IO
本质上也就是用户空间
和内核空间
的拷贝。扯远了,读取Channel
中的数据也是需要提供一个缓冲区的。NIO
中的缓冲是一个Buffer
对象,简单点理解它底层就是一个数组,提供比数组更多的方式去管理和操作数组中的元素,但是它的实现也不是完全依赖于数组。
Buffer
是一个抽象类,它有几种基本数据类型的实现,例如,HeapByteBuffer
、HeapIntBuffer
,它们的底层也维护着一个与之对应的数组结构,注意这个Heap所表达的含义,它对应了我们使用new
运算符在Heap
中所创建的数组。Buffer
一般通过静态方法allocate
去创建,当然创建方法还有可以一个数组入参的wrap(array)
方法。
ByteBuffer byteBuff1 = ByteBuffer.allocate(1024); ByteBuffer byteBuff2 = ByteBuffer.wrap(new byte[1024]);
在Java中数组是不能访问索引超过数组大小的元素,如果超过则抛出索引越界异常。Buffer
提供get(ini)
/put(int, T)
方法用于获取或放置指定位置的数据。既然底层是数组,那么Buffer
就有一个最大容量,它和底层数组的大小等价,是一个不会改变的值。在Buffer
中有一个capacity
属性代表着容量大小,它通过Buffer.capacity()
方法直接获取。
Buffer
还有一个核心概念limit
,它的值可以通过Buffer.limit()
方法直接获取,官方为其定义为第一个不可读/写的元素。也就是说从limit
开始到capacity
这段空间是不会被读取或写入的,用户只能访问索引从起始位置0
~ limit - 1
中的元素,使用大于(或等于)limit
位置上的元素会抛出索引越界异常。limit
的大小可以通过Buffer.limit(int)
进行修改但不能超过capacity
,limit
初始值和capacity
相等。
byte[] array = new byte[1024]; ByteBuffer byteBuff = ByteBuffer.wrap(array); array.length == byteBuff.capacity(); // true byteBuff.capacity() == byteBuff.limit(); // true byteBuff.get(1023);byteBuff.limit(1023); // limit = 1023 byteBuff.get(1023); // java.lang.IndexOutOfBoundsException
在linux内核中维护着一个打开文件表
,一个打开文件
对应着表中的一条记录,其中维护打开文件的偏移量
、状态
等信息。一些系统调用
可以改变这些信息。比如,在read
一个文件描述符(file descriptor)
时会隐式将偏移量作调整,下次读取时就会从该位置开始操作。
下图为文件描述符表、打开文件表、inode表之间的关系:


在Buffer
中也有这么一个类似偏移量的概念叫做position
,它的值可以通过Buffer.position()
方法直接获取,官方对其的定义是下一个要读或写的索引值。每次读取都会改变position
的值,但是无论如何都不会超过limit
,也就是说当position
抵达limit
时就无法用这个Buffer
实例读入或写出数据。可以通过Buffer.flip()
方法将limit
设置为当前position
位置并将position
初始化为0,此时就可以使用这个Buffer
去完成写
操作,这个过程也叫做读写切换。
while (fileChannel.read(byteBuff) > 0) { int p = byteBuff.position(); byteBuff.flip(); // limit = position; position = 0; p == byteBuff.limit(); // true process(byteBuff); }
还有一个和position
相关的核心概念mark
,在读取数据的过程中可以通过Buffer.mark()
将mark
的值置为position
。mark
的值没有办法直接获取,但可以通过使用reset()
将position
重置为mark
的值,reset()
方法不能在mark()
之前调用。
以上就是Buffer
中的几个核心概念,它们的之间的关系为:
0⩽mark⩽position⩽limit
⩽capacity
多了Buffer
这一层,我们就不用关心它们底层缓冲区具体是什么,通过实现一套API就能完成基本的I/O
操作。比如,通过DirectByteBuffer.allocateDirect()
创建的DirectByteBuffer
,它使用了堆外内存
实现Buffer
。又或者用FileChannel.map()
方法创建的MappedByteBuffer
,它将文件直接映射到内存中,是使用内存映射
实现的Buffer
,也是零拷贝
的一种实现。
Buffer
有很多高级用法就不一一叙述了,毕竟本文也不是介绍API的文章。
推荐读物
写到最后,我要推荐(安利)几本个人觉得非常不错的读物:
首先当然是Steven大神的 《UNIX网络编程 卷1 + 卷2》
,这是网络编程必读书目之一。
系统编程首先推荐 《Linux/Unix系统编程手册》
,系统编程专家级读物。当然apue
也非常不错,不过感觉这本可以与apue
互补。
Java 网络编程 《Java TCP/IP Socket编程》
,关于Java Socket API 和 NIO API做了很全面的介绍,虽然内容有点老。