從網絡通信的演進過程徹底搞懂Redis高性能通信的原理(全網最詳細,建議收藏)
- 2021 年 10 月 18 日
- 筆記
我們一直說Redis的性能很快,那為什麼快?Redis為了達到性能最大化,做了哪些方面的優化呢?
在深度解析Redis的數據結構
這篇文章中,其實從數據結構上分析了Redis性能高的一方面原因。
在目前的k-v數據庫的技術選型中,Redis幾乎是首選的用來實現高性能緩存的方案,它的性能有多快呢?
根據官方的基準測試數據,一台普通硬件配置的Linux機器上運行單個Redis實例,處理簡單命令(O(n)或者O(logn)),QPS可以達到8W,如果使用pipeline批處理功能,QPS最高可以達到10W。
Redis 為什麼那麼快
Redis的高性能主要依賴於幾個方面。
- C語言實現,C語言在一定程度上還是比Java語言性能要高一些,因為C語言不需要經過JVM進行翻譯。
- 純內存I/O,內存I/O比磁盤I/O性能更快
- I/O多路復用,基於epoll的I/O多路復用技術,實現高吞吐網絡I/O
- 單線程模型,單線程無法利用到多核CPU,但是在Redis中,性能瓶頸並不是在計算上,而是在I/O能力,所以單線程能夠滿足高並發的要求。 從另一個層面來說,單線程可以避免多線程的頻繁上下文切換以及同步鎖機制帶來的性能開銷。
下面我們分別從上述幾個方面進行展開說明,先來看網絡I/O的多路復用模型。
從請求處理開始分析
當我們在客戶端向Redis Server發送一條指令,並且得到Redis回復的整個過程中,Redis做了什麼呢?
要處理命令,則redis必須完整地接收客戶端的請求,並將命令解析出來,再將結果讀出來,通過網絡回寫到客戶端。整個工序分為以下幾個部分:
- 接收,通過TCP接收到命令,可能會歷經多次TCP包、ack、IO操作
- 解析,將命令取出來
- 執行,到對應的地方將value讀出來
- 返回,將value通過TCP返回給客戶端,如果value較大,則IO負荷會更重
其中解析和執行是純cpu/內存操作,而接收和返回主要是IO操作,首先我們先來看通信的過程。
網絡IO的通信原理
同樣,我也畫了一幅圖來描述網絡數據的傳輸流程
首先,對於TCP通信來說,每個TCP Socket的內核中都有一個發送緩衝區和一個接收緩衝區
接收緩衝區把數據緩存到內核,若應用進程一直沒有調用Socket的read方法進行讀取,那麼該數據會一直被緩存在接收緩衝區內。不管進程是否讀取Socket,對端發來的數據都會經過內核接收並緩存到Socket的內核接收緩衝區。
read所要做的工作,就是把內核接收緩衝區中的數據複製到應用層用戶的Buffer里。
進程調用Socket的send發送數據的時候,一般情況下是將數據從應用層用戶的Buffer里複製到Socket的內核發送緩衝區,然後send就會在上層返回。換句話說,send返回時,數據不一定會被發送到對端。
網卡中的緩衝區既不屬於內核空間,也不屬於用戶空間。它屬於硬件緩衝,允許網卡與操作系統之間有個緩衝;
內核緩衝區在內核空間,在內存中,用於內核程序,做為讀自或寫往硬件的數據緩衝區;
用戶緩衝區在用戶空間,在內存中,用於用戶程序,做為讀自或寫往硬件的數據緩衝區
網卡芯片收到網絡數據會以中斷的方式通知CPU,我有數據了,存在我的硬件緩衝里了,來讀我啊。
CPU收到這個中斷信號後,會調用相應的驅動接口函數從網卡的硬件緩衝里把數據讀到內核緩衝區,正常情況下會向上傳遞給TCP/IP模塊一層一層的處理。
NIO多路復用機制
Redis的通信採用的是多路復用機制,什麼是多路復用機制呢?
由於Redis是C語言實現,為了簡化大家的理解,我們採用Java語言來描述這個過程。
在理解多路復用之前,我們先來了解一下BIO。
BIO模型
在Java中,如果要實現網絡通信,我們會採用Socket套接字來完成。
Socket這不是一個協議,而是一個通信模型。其實它最初是BSD發明的,主要用來一台電腦的兩個進程間通信,然後把它用到了兩台電腦的進程間通信。所以,可以把它簡單理解為進程間通信,不是什麼高級的東西。主要做的事情不就是:
-
A發包:發請求包給某個已經綁定的端口(所以我們經常會訪問這樣的地址182.13.15.16:1235,1235就是端口);收到B的允許;然後正式發送;發送完了,告訴B要斷開鏈接;收到斷開允許,馬上斷開,然後發送已經斷開信息給B。
-
B收包:綁定端口和IP;然後在這個端口監聽;接收到A的請求,發允許給A,並做好接收準備,主要就是清理緩存等待接收新數據;然後正式接收;接受到斷開請求,允許斷開;確認斷開後,繼續監聽其它請求。
可見,Socket其實就是I/O操作,Socket並不僅限於網絡通信,在網絡通信中,它涵蓋了網絡層、傳輸層、會話層、表示層、應用層——其實這都不需要記,因為Socket通信時候用到了IP和端口,僅這兩個就表明了它用到了網絡層和傳輸層;而且它無視多台電腦通信的系統差別,所以它涉及了表示層;一般Socket都是基於一個應用程序的,所以會涉及到會話層和應用層。
構建基礎的BIO通信模型
BIOServerSocket
public class BIOServerSocket {
//先定義一個端口號,這個端口的值是可以自己調整的。
static final int DEFAULT_PORT=8080;
public static void main(String[] args) throws IOException {
//先定義一個端口號,這個端口的值是可以自己調整的。
//在服務器端,我們需要使用ServerSocket,所以我們先聲明一個ServerSocket變量
ServerSocket serverSocket=null;
//接下來,我們需要綁定監聽端口, 那我們怎麼做呢?只需要創建使用serverSocket實例
//ServerSocket有很多構造重載,在這裡,我們把前邊定義的端口傳入,表示當前
//ServerSocket監聽的端口是8080
serverSocket=new ServerSocket(DEFAULT_PORT);
System.out.println("啟動服務,監聽端口:"+DEFAULT_PORT);
//回顧一下前面我們講的內容,接下來我們就需要開始等待客戶端的連接了。
//所以我們要使用的是accept這個函數,並且當accept方法獲得一個客戶端請求時,會返回
//一個socket對象, 這個socket對象讓服務器可以用來和客戶端通信的一個端點。
//開始等待客戶端連接,如果沒有客戶端連接,就會一直阻塞在這個位置
Socket socket=serverSocket.accept();
//很可能有多個客戶端來發起連接,為了區分客戶端,咱們可以輸出客戶端的端口號
System.out.println("客戶端:"+socket.getPort()+"已連接");
//一旦有客戶端連接過來,我們就可以用到IO來獲得客戶端傳過來的數據。
//使用InputStream來獲得客戶端的輸入數據
//bufferedReader大家還記得吧,他維護了一個緩衝區可以減少數據源讀取的頻率
BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
String clientStr=bufferedReader.readLine(); //讀取一行信息
System.out.println("客戶端發了一段消息:"+clientStr);
//服務端收到數據以後,可以給到客戶端一個回復。這裡咱們用到BufferedWriter
BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("我已經收到你的消息了\n");
bufferedWriter.flush(); //清空緩衝區觸發消息發送
}
}
BIOClientSocket
public class BIOClientSocket {
static final int DEFAULT_PORT=8080;
public static void main(String[] args) throws IOException {
//在客戶端這邊,咱們使用socket來連接到指定的ip和端口
Socket socket=new Socket("localhost",8080);
//使用BufferedWriter,像服務器端寫入一個消息
BufferedWriter bufferedWriter=new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("我是客戶端Client-01\n");
bufferedWriter.flush();
BufferedReader bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream()));
String serverStr=bufferedReader.readLine(); //通過bufferedReader讀取服務端返回的消息
System.out.println("服務端返回的消息:"+serverStr);
}
}
上述代碼構建了一個簡單的BIO通信模型,也就是服務端建立一個監聽,客戶端向服務端發送一個消息,實現簡單的網絡通信,那BIO有什麼弊端呢?
我們通過對BIOServerSocket進行改造,關注case1和case2部分。
- case1: 增加了while循環,實現重複監聽
- case2: 當服務端收到客戶端的請求後,不直接返回,而是等待20s。
public class BIOServerSocket {
//先定義一個端口號,這個端口的值是可以自己調整的。
static final int DEFAULT_PORT=8080;
public static void main(String[] args) throws IOException, InterruptedException {
ServerSocket serverSocket=null;
serverSocket=new ServerSocket(DEFAULT_PORT);
System.out.println("啟動服務,監聽端口:"+DEFAULT_PORT);
while(true) { //case1: 增加循環,允許循環接收請求
Socket socket = serverSocket.accept();
System.out.println("客戶端:" + socket.getPort() + "已連接");
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String clientStr = bufferedReader.readLine(); //讀取一行信息
System.out.println("客戶端發了一段消息:" + clientStr);
Thread.sleep(20000); //case2: 修改:增加等待時間
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("我已經收到你的消息了\n");
bufferedWriter.flush(); //清空緩衝區觸發消息發送
}
}
}
接着,把BIOClientSocket複製兩份(client1、client2),同時向BIOServerSocket發起請求。
運行後看到的現象應該是: client1先發送請求到Server端,由於Server端等待20s才返回,導致client2的請求一直被阻塞。
這個情況會導致一個問題,如果服務端在同一個時刻只能處理一個客戶端的連接,而如果一個網站同時有1000個用戶訪問,那麼剩下的999個用戶都需要等待,而這個等待的耗時取決於前面的請求的處理時長,如圖4-2所示。
基於多線程優化BIO
為了讓服務端能夠同時處理更多的客戶端連接,避免因為某個客戶端連接阻塞導致後續請求被阻塞,於是引入多線程技術,代碼如下。
ServerSocket
public static void main(String[] args) throws IOException, InterruptedException {
final int DEFAULT_PORT=8080;
ServerSocket serverSocket=null;
serverSocket=new ServerSocket(DEFAULT_PORT);
System.out.println("啟動服務,監聽端口:"+DEFAULT_PORT);
ExecutorService executorService= Executors.newFixedThreadPool(5);
while(true) {
Socket socket = serverSocket.accept();
executorService.submit(new SocketThread(socket));
}
}
SocketThread
public class SocketThread implements Runnable{
Socket socket;
public SocketThread(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
System.out.println("客戶端:" + socket.getPort() + "已連接");
try {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
String clientStr = null; //讀取一行信息
clientStr = bufferedReader.readLine();
System.out.println("客戶端發了一段消息:" + clientStr);
Thread.sleep(20000);
BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream()));
bufferedWriter.write("我已經收到你的消息了\n");
bufferedWriter.flush(); //清空緩衝區觸發消息發送
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
如圖4-3所示,當引入了多線程之後,每個客戶端的鏈接(Socket),我們可以直接給到線程池去執行,而由於這個過程是異步的,所以並不會同步阻塞影響後續鏈接的監聽,因此在一定程度上可以提升服務端鏈接的處理數量。
NIO非阻塞IO
使用多線程的方式來解決這個問題,仍然有一個缺點,線程的數量取決於硬件配置,所以線程數量是有限的,如果請求量比較大的時候,線程本身會收到限制從而並發量也不會太高。那怎麼辦呢,我們可以採用非阻塞IO。
NIO 從JDK1.4 提出的,本意是New IO,它的出現為了彌補原本IO的不足,提供了更高效的方式,提出一個通道(channel)的概念,在IO中它始終以流的形式對數據的傳輸和接受,下面我們演示一下NIO的使用。
NioServerSocket
public class NioServerSocket {
public static void main(String[] args) {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
while (true) {
SocketChannel socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
//讀取數據
ByteBuffer buffer = ByteBuffer.allocate(1024);
socketChannel.read(buffer);
System.out.println(new String(buffer.array()));
//寫出數據
Thread.sleep(10000); //阻塞一段時間
//當數據讀取到緩衝區之後,接下來就需要把緩衝區的數據寫出到通道,而在寫出之前必須要調用flip方法,實際上就是重置一個有效位元組範圍,然後把這個數據接觸到通道。
buffer.flip();
socketChannel.write(buffer);//寫出數據
} else {
Thread.sleep(1000);
System.out.println("連接未就緒");
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
NioClientSocket
public class NioClientSocket {
public static void main(String[] args) {
try {
SocketChannel socketChannel= SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("localhost",8080));
if(socketChannel.isConnectionPending()){
socketChannel.finishConnect();
}
ByteBuffer byteBuffer= ByteBuffer.allocate(1024);
byteBuffer.put("Hello I'M SocketChannel Client".getBytes());
byteBuffer.flip();
socketChannel.write(byteBuffer);
//讀取服務端數據
byteBuffer.clear();
while(true) {
int i = socketChannel.read(byteBuffer);
if (i > 0) {
System.out.println("收到服務端的數據:" + new String(byteBuffer.array()));
} else {
System.out.println("服務端數據未準備好");
Thread.sleep(1000);
}
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
所謂的NIO(非阻塞IO),其實就是取消了IO阻塞和連接阻塞,當服務端不存在阻塞的時候,就可以不斷輪詢處理客戶端的請求,如圖4-4所示,表示NIO下的運行流程。
上述這種NIO的使用方式,仍然存在一個問題,就是客戶端或者服務端需要通過一個線程不斷輪詢才能獲得結果,而這個輪詢過程中會浪費線程資源。
多路復用IO
大家站在全局的角度再思考一下整個過程,有哪些地方可以優化呢?
我們回到NIOClientSocket中下面這段代碼,當客戶端通過read
方法去讀取服務端返回的數據時,如果此時服務端數據未準備好,對於客戶端來說就是一次無效的輪詢。
我們能不能夠設計成,當客戶端調用read
方法之後,不僅僅不阻塞,同時也不需要輪詢。而是等到服務端的數據就緒之後, 告訴客戶端。然後客戶端再去讀取服務端返回的數據呢?
就像點外賣一樣,我們在網上下單之後,繼續做其他事情,等到外賣到了公司,外賣小哥主動打電話告訴你,你直接去前台取餐即可。
while(true) {
int i = socketChannel.read(byteBuffer);
if (i > 0) {
System.out.println("收到服務端的數據:" + new String(byteBuffer.array()));
} else {
System.out.println("服務端數據未準備好");
Thread.sleep(1000);
}
}
所以為了優化這個問題,引入了多路復用機制。
I/O多路復用的本質是通過一種機制(系統內核緩衝I/O數據),讓單個進程可以監視多個文件描述符,一旦某個描述符就緒(一般是讀就緒或寫就緒),能夠通知程序進行相應的讀寫操作
什麼是fd:在linux中,內核把所有的外部設備都當成是一個文件來操作,對一個文件的讀寫會調用內核提供的系統命令,返回一個fd(文件描述符)。而對於一個socket的讀寫也會有相應的文件描述符,成為socketfd。
常見的IO多路復用方式有【select、poll、epoll】,都是Linux API提供的IO復用方式,那麼接下來重點講一下select、和epoll這兩個模型
-
select:進程可以通過把一個或者多個fd傳遞給select系統調用,進程會阻塞在select操作上,這樣select可以幫我們檢測多個fd是否處於就緒狀態,這個模式有兩個缺點
- 由於他能夠同時監聽多個文件描述符,假如說有1000個,這個時候如果其中一個fd 處於就緒狀態了,那麼當前進程需要線性輪詢所有的fd,也就是監聽的fd越多,性能開銷越大。
- 同時,select在單個進程中能打開的fd是有限制的,默認是1024,對於那些需要支持單機上萬的TCP連接來說確實有點少
-
epoll:linux還提供了epoll的系統調用,epoll是基於事件驅動方式來代替順序掃描,因此性能相對來說更高,主要原理是,當被監聽的fd中,有fd就緒時,會告知當前進程具體哪一個fd就緒,那麼當前進程只需要去從指定的fd上讀取數據即可,另外,epoll所能支持的fd上線是操作系統的最大文件句柄,這個數字要遠遠大於1024
【由於epoll能夠通過事件告知應用進程哪個fd是可讀的,所以我們也稱這種IO為異步非阻塞IO,當然它是偽異步的,因為它還需要去把數據從內核同步複製到用戶空間中,真正的異步非阻塞,應該是數據已經完全準備好了,我只需要從用戶空間讀就行】
I/O多路復用的好處是可以通過把多個I/O的阻塞復用到同一個select的阻塞上,從而使得系統在單線程的情況下可以同時處理多個客戶端請求。它的最大優勢是系統開銷小,並且不需要創建新的進程或者線程,降低了系統的資源開銷,它的整體實現思想如圖4-5所示。
客戶端請求到服務端後,此時客戶端在傳輸數據過程中,為了避免Server端在read客戶端數據過程中阻塞,服務端會把該請求註冊到Selector復路器上,服務端此時不需要等待,只需要啟動一個線程,通過selector.select()阻塞輪詢復路器上就緒的channel即可,也就是說,如果某個客戶端連接數據傳輸完成,那麼select()方法會返回就緒的channel,然後執行相關的處理即可。
NIOServer的實現如下
測試訪問的時候,直接在cmd中通過telnet連接NIOServer,便可發送信息。
public class NIOServer implements Runnable{
Selector selector;
ServerSocketChannel serverSocketChannel;
public NIOServer(int port) throws IOException {
selector=Selector.open(); //多路復用器
serverSocketChannel=ServerSocketChannel.open();
//綁定監聽端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);//非阻塞配置
//針對serverSocketChannel註冊一個ACCEPT連接監聽事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
}
@Override
public void run() {
while(!Thread.interrupted()){
try {
selector.select(); //阻塞等待事件就緒
Set selected=selector.selectedKeys(); //得到事件列表
Iterator it=selected.iterator();
while(it.hasNext()){
dispatch((SelectionKey) it.next()); //分發事件
it.remove(); //移除當前時間
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey key) throws IOException {
if(key.isAcceptable()){ //如果是客戶端的連接事件,則需要針對該連接註冊讀寫事件
register(key);
}else if(key.isReadable()){
read(key);
}else if(key.isWritable()){
write(key);
}
}
private void register(SelectionKey key) throws IOException {
//得到事件對應的連接
ServerSocketChannel server=(ServerSocketChannel)key.channel();
SocketChannel channel=server.accept(); //獲得客戶端的鏈接
channel.configureBlocking(false);
//把當前客戶端連接註冊到selector上,註冊事件為READ,
// 也就是當前channel可讀時,就會觸發事件,然後讀取客戶端的數據
channel.register(this.selector,SelectionKey.OP_READ);
}
private void read(SelectionKey key) throws IOException {
SocketChannel channel=(SocketChannel)key.channel();
ByteBuffer byteBuffer= ByteBuffer.allocate(1024);
channel.read(byteBuffer); //把數據從channel讀取到緩衝區
System.out.println("server receive msg:"+new String(byteBuffer.array()));
}
private void write(SelectionKey key) throws IOException {
SocketChannel channel=(SocketChannel)key.channel();
//寫一個信息給到客戶端
channel.write(ByteBuffer.wrap("hello Client,I'm NIO Server\r\n".getBytes()));
}
public static void main(String[] args) throws IOException {
NIOServer server=new NIOServer(8888);
new Thread(server).start();
}
}
事實上NIO已經解決了上述BIO暴露的下面兩個問題:
- 同步阻塞IO,讀寫阻塞,線程等待時間過長。
- 在制定線程策略的時候,只能根據CPU的數目來限定可用線程資源,不能根據連接並發數目來制定,也就是連接有限制。否則很難保證對客戶端請求的高效和公平。
到這裡為止,通過NIO的多路復用機制,解決了IO阻塞導致客戶端連接處理受限的問題,服務端只需要一個線程就可以維護多個客戶端,並且客戶端的某個連接如果準備就緒時,會通過事件機制告訴應用程序某個channel可用,應用程序通過select方法選出就緒的channel進行處理。
單線程Reactor 模型(高性能I/O設計模式)
了解了NIO多路復用後,就有必要再和大家說一下Reactor多路復用高性能I/O設計模式,Reactor本質上就是基於NIO多路復用機制提出的一個高性能IO設計模式,它的核心思想是把響應IO事件和業務處理進行分離,通過一個或者多個線程來處理IO事件,然後將就緒得到事件分發到業務處理handlers線程去異步非阻塞處理,如圖4-6所示。
Reactor模型有三個重要的組件:
- Reactor :將I/O事件發派給對應的Handler
- Acceptor :處理客戶端連接請求
- Handlers :執行非阻塞讀/寫
下面演示一個單線程的Reactor模型。
Reactor
Reactor 負責響應IO事件,一旦發生,廣播發送給相應的Handler去處理。
public class Reactor implements Runnable{
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws IOException {
//創建選擇器
selector= Selector.open();
//創建NIO-Server
serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);
SelectionKey key=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
// 綁定一個附加對象
key.attach(new Acceptor(selector,serverSocketChannel));
}
@Override
public void run() {
while(!Thread.interrupted()){
try {
selector.select(); //阻塞等待就緒事件
Set selectionKeys=selector.selectedKeys();
Iterator it=selectionKeys.iterator();
while(it.hasNext()){
dispatch((SelectionKey) it.next());
it.remove();
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
public void dispatch(SelectionKey key){
//調用之前註冊時附加的對象,也就是attach附加的acceptor
Runnable r=(Runnable)key.attachment();
if(r!=null){
r.run();
}
}
public static void main(String[] args) throws IOException {
new Thread(new Reactor(8888)).start();
}
}
Acceptor
public class Acceptor implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
@Override
public void run() {
SocketChannel channel;
try {
channel=serverSocketChannel.accept();
System.out.println(channel.getRemoteAddress()+": 收到一個客戶端連接");
channel.configureBlocking(false);
//當channel連接中數據就緒時,調用DispatchHandler來處理channel
//巧妙使用了SocketChannel的attach功能,將Hanlder和可能會發生事件的channel鏈接在一起,當發生事件時,可以立即觸發相應鏈接的Handler。
channel.register(selector, SelectionKey.OP_READ,new DispatchHandler(channel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
Handler
public class DispatchHandler implements Runnable{
private SocketChannel channel;
public DispatchHandler(SocketChannel channel) {
this.channel = channel;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"---handler"); //case: 打印當前線程名稱,證明I/O是同一個線程來處理。
ByteBuffer buffer=ByteBuffer.allocate(1024);
int len=0,total=0;
String msg="";
try {
do {
len = channel.read(buffer);
if (len > 0) {
total += len;
msg += new String(buffer.array());
}
buffer.clear();
} while (len > buffer.capacity());
System.out.println(channel.getRemoteAddress()+":Server Receive msg:"+msg);
}catch (Exception e){
e.printStackTrace();
if(channel!=null){
try {
channel.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
}
}
演示方式,通過window的cmd窗口,使用telnet 192.168.1.102 8888 連接到Server端進行數據通信;也可以通過下面這樣一個客戶端程序來訪問。
ReactorClient
public class ReactorClient {
private static Selector selector;
public static void main(String[] args) throws IOException {
selector=Selector.open();
//創建一個連接通道連接指定的server
SocketChannel socketChannel= SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("192.168.1.102",8888));
socketChannel.register(selector, SelectionKey.OP_CONNECT);
while(true){
selector.select();
Set<SelectionKey> selectionKeys=selector.selectedKeys();
Iterator<SelectionKey> iterator=selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey key=iterator.next();
iterator.remove();
if(key.isConnectable()){
handleConnection(key);
}else if(key.isReadable()){
handleRead(key);
}
}
}
}
private static void handleConnection(SelectionKey key) throws IOException {
SocketChannel socketChannel=(SocketChannel)key.channel();
if(socketChannel.isConnectionPending()){
socketChannel.finishConnect();
}
socketChannel.configureBlocking(false);
while(true) {
Scanner in = new Scanner(System.in);
String msg = in.nextLine();
socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
socketChannel.register(selector,SelectionKey.OP_READ);
}
}
private static void handleRead(SelectionKey key) throws IOException {
SocketChannel channel=(SocketChannel)key.channel();
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
channel.read(byteBuffer);
System.out.println("client receive msg:"+new String(byteBuffer.array()));
}
}
這是最基本的單Reactor單線程模型(整體的I/O操作是由同一個線程完成的)。
其中Reactor線程,負責多路分離套接字,有新連接到來觸發connect 事件之後,交由Acceptor進行處理,有IO讀寫事件之後交給hanlder 處理。
Acceptor主要任務就是構建handler ,在獲取到和client相關的SocketChannel之後 ,綁定到相應的hanlder上,對應的SocketChannel有讀寫事件之後,基於racotor 分發,hanlder就可以處理了(所有的IO事件都綁定到selector上,有Reactor分發)
Reactor 模式本質上指的是使用 I/O 多路復用(I/O multiplexing) + 非阻塞 I/O(non-blocking I/O)的模式。
多線程單Reactor模型
單線程Reactor這種實現方式有存在着缺點,從實例代碼中可以看出,handler的執行是串行的,如果其中一個handler處理線程阻塞將導致其他的業務處理阻塞。由於handler和reactor在同一個線程中的執行,這也將導致新的無法接收新的請求,我們做一個小實驗:
- 在上述Reactor代碼的DispatchHandler的run方法中,增加一個Thread.sleep()。
- 打開多個客戶端窗口連接到Reactor Server端,其中一個窗口發送一個信息後被阻塞,另外一個窗口再發信息時由於前面的請求阻塞導致後續請求無法被處理。
為了解決這種問題,有人提出使用多線程的方式來處理業務,也就是在業務處理的地方加入線程池異步處理,將reactor和handler在不同的線程來執行,如圖4-7所示。
多線程改造-MultiDispatchHandler
我們直接將4.2.5小節中的Reactor單線程模型改成多線程,其實我們就是把IO阻塞的問題通過異步的方式做了優化,代碼如下,
public class MultiDispatchHandler implements Runnable{
private SocketChannel channel;
public MultiDispatchHandler(SocketChannel channel) {
this.channel = channel;
}
private static Executor executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() << 1);
@Override
public void run() {
processor();
}
private void processor(){
executor.execute(new ReaderHandler(channel));
}
public static class ReaderHandler implements Runnable{
private SocketChannel channel;
public ReaderHandler(SocketChannel socketChannel) {
this.channel = socketChannel;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName()+"---handler"); //case: 打印當前線程名稱,證明I/O是同一個線程來處理。
ByteBuffer buffer= ByteBuffer.allocate(1024);
int len=0;
String msg="";
try {
do {
len = channel.read(buffer);
if (len > 0) {
msg += new String(buffer.array());
}
buffer.clear();
} while (len > buffer.capacity());
if(len>0) {
System.out.println(channel.getRemoteAddress() + ":Server Receive msg:" + msg);
}
}catch (Exception e){
e.printStackTrace();
if(channel!=null){
try {
channel.close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
}
}
}
Acceptor
public class Acceptor implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
public Acceptor(Selector selector, ServerSocketChannel serverSocketChannel) {
this.selector = selector;
this.serverSocketChannel = serverSocketChannel;
}
@Override
public void run() {
SocketChannel channel;
try {
channel=serverSocketChannel.accept();
System.out.println(channel.getRemoteAddress()+": 收到一個客戶端連接");
channel.configureBlocking(false);
//當channel連接中數據就緒時,調用DispatchHandler來處理channel
//巧妙使用了SocketChannel的attach功能,將Hanlder和可能會發生事件的channel鏈接在一起,當發生事件時,可以立即觸發相應鏈接的Handler。
channel.register(selector, SelectionKey.OP_READ,new MultiDispatchHandler(channel));
} catch (IOException e) {
e.printStackTrace();
}
}
}
多線程Reactor總結
在多線程Reactor模型中,添加了一個工作者線程池,並將非I/O操作從Reactor線程中移出轉交給工作者線程池來執行。這樣能夠提高Reactor線程的I/O響應,不至於因為一些耗時的業務邏輯而延遲對後面I/O請求的處理。
多Reactor多線程模式(主從多Reactor模型)
在多線程單Reactor模型中,我們發現所有的I/O操作是由一個Reactor來完成,而Reactor運行在單個線程中,它需要處理包括Accept()
/read()
/write
/connect
操作,對於小容量的場景,影響不大。但是對於高負載、大並發或大數據量的應用場景時,容易成為瓶頸,主要原因如下:
- 一個NIO線程同時處理成百上千的鏈路,性能上無法支撐,即便NIO線程的CPU負荷達到100%,也無法滿足海量消息的讀取和發送;
- 當NIO線程負載過重之後,處理速度將變慢,這會導致大量客戶端連接超時,超時之後往往會進行重發,這更加重了NIO線程的負載,最終會導致大量消息積壓和處理超時,成為系統的性能瓶頸;
所以,我們還可以更進一步優化,引入多Reactor多線程模式,如圖4-8所示,Main Reactor負責接收客戶端的連接請求,然後把接收到的請求傳遞給SubReactor(其中subReactor可以有多個),具體的業務IO處理由SubReactor完成。
Multiple Reactors 模式通常也可以等同於 Master-Workers 模式,比如 Nginx 和 Memcached 等就是採用這種多線程模型,雖然不同的項目實現細節略有區別,但總體來說模式是一致的。
-
Acceptor,請求接收者,在實踐時其職責類似服務器,並不真正負責連接請求的建立,而只將其請求委託 Main Reactor 線程池來實現,起到一個轉發的作用。
-
Main Reactor,主 Reactor 線程組,主要負責連接事件,並將IO讀寫請求轉發到 SubReactor 線程池。
-
Sub Reactor,Main Reactor 通常監聽客戶端連接後會將通道的讀寫轉發到 Sub Reactor 線程池中一個線程(負載均衡),負責數據的讀寫。在 NIO 中 通常註冊通道的讀(OP_READ)、寫事件(OP_WRITE)。
MultiplyReactor
public class MultiplyReactor {
public static void main(String[] args) throws IOException {
MultiplyReactor mr = new MultiplyReactor(8888);
mr.start();
}
private static final int POOL_SIZE = Runtime.getRuntime().availableProcessors();
// Reactor(Selector) 線程池,其中一個線程被 mainReactor 使用,剩餘線程都被 subReactor 使用
static Executor mainReactorExecutor = Executors.newFixedThreadPool(POOL_SIZE);
// 主 Reactor,接收連接,把 SocketChannel 註冊到從 Reactor 上
private Reactor mainReactor;
private int port;
public MultiplyReactor(int port) {
try {
this.port = port;
mainReactor = new Reactor();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 啟動主從 Reactor,初始化並註冊 Acceptor 到主 Reactor
*/
public void start() throws IOException {
new Acceptor(mainReactor.getSelector(), port); // 將 ServerSocketChannel 註冊到 mainReactor
mainReactorExecutor.execute(mainReactor); //使用線程池來處理main Reactor的連接請求
}
}
Reactor
public class Reactor implements Runnable{
private ConcurrentLinkedQueue<AsyncHandler> events=new ConcurrentLinkedQueue<>();
private final Selector selector;
public Reactor() throws IOException {
this.selector = Selector.open();
}
public Selector getSelector(){
return selector;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
AsyncHandler handler;
while ((handler = events.poll()) != null) {
handler.getChannel().configureBlocking(false);
SelectionKey sk=handler.getChannel().register(selector, SelectionKey.OP_READ);
sk.attach(handler);
handler.setSk(sk);
}
selector.select(); //阻塞
Set<SelectionKey> selectionKeys=selector.selectedKeys();
Iterator<SelectionKey> it=selectionKeys.iterator();
while(it.hasNext()){
SelectionKey key=it.next();
//獲取attach方法傳入的附加對象
Runnable runnable=(Runnable)key.attachment();
if(runnable!=null){
runnable.run();
}
it.remove();
}
}
}catch (Exception e){
e.printStackTrace();
}
}
public void register(AsyncHandler asyncHandler){
events.offer(asyncHandler);
selector.wakeup();
}
}
Acceptor
public class Acceptor implements Runnable{
final Selector sel;
final ServerSocketChannel serverSocket;
int handleNext = 0;
private final int POOL_SIZE=Runtime.getRuntime().availableProcessors();
private Executor subReactorExecutor= Executors.newFixedThreadPool(POOL_SIZE);
private Reactor[] subReactors=new Reactor[POOL_SIZE-1];
public Acceptor(Selector sel, int port) throws IOException {
this.sel = sel;
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind(new InetSocketAddress(port)); // 綁定端口
// 設置成非阻塞模式
serverSocket.configureBlocking(false);
// 註冊到 選擇器 並設置處理 socket 連接事件
serverSocket.register(sel, SelectionKey.OP_ACCEPT,this);
init();
System.out.println("mainReactor-" + "Acceptor: Listening on port: " + port);
}
public void init() throws IOException {
for (int i = 0; i < subReactors.length; i++) {
subReactors[i]=new Reactor();
subReactorExecutor.execute(subReactors[i]);
}
}
@Override
public synchronized void run() {
try {
// 接收連接,非阻塞模式下,沒有連接直接返回 null
SocketChannel sc = serverSocket.accept();
if (sc != null) {
// 把提示發到界面
sc.write(ByteBuffer.wrap("Multiply Reactor Pattern Example\r\nreactor> ".getBytes()));
System.out.println(Thread.currentThread().getName()+":Main-Reactor-Acceptor: " + sc.socket().getLocalSocketAddress() +" 註冊到 subReactor-" + handleNext);
// 如何解決呢,直接調用 wakeup,有可能還沒有註冊成功又阻塞了。這是一個多線程同步的問題,可以藉助隊列進行處理
Reactor subReactor = subReactors[handleNext];
subReactor.register(new AsyncHandler(sc));
if(++handleNext == subReactors.length) {
handleNext = 0;
}
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
AsyncHandler
public class AsyncHandler implements Runnable{
private SocketChannel channel;
private SelectionKey sk;
ByteBuffer inputBuffer=ByteBuffer.allocate(1024);
ByteBuffer outputBuffer=ByteBuffer.allocate(1024);
StringBuilder builder=new StringBuilder(); //存儲客戶端的完整消息
public AsyncHandler(SocketChannel channel){
this.channel=channel;
}
public SocketChannel getChannel() {
return channel;
}
public void setSk(SelectionKey sk) {
this.sk = sk;
}
@Override
public void run() {
try {
if (sk.isReadable()) {
read();
} else if (sk.isWritable()) {
write();
}
}catch (Exception e){
try {
this.sk.channel().close();
} catch (IOException ioException) {
ioException.printStackTrace();
}
}
}
protected void read() throws IOException {
inputBuffer.clear();
int n=channel.read(inputBuffer);
if(inputBufferComplete(n)){
System.out.println(Thread.currentThread().getName()+":Server端收到客戶端的請求消息:"+builder.toString());
outputBuffer.put(builder.toString().getBytes(StandardCharsets.UTF_8));
this.sk.interestOps(SelectionKey.OP_WRITE); //更改服務的邏輯狀態以及處理的事件類型
}
}
private boolean inputBufferComplete(int bytes) throws EOFException {
if(bytes>0){
inputBuffer.flip(); //轉化成讀取模式
while(inputBuffer.hasRemaining()){ //判斷緩衝區中是否還有元素
byte ch=inputBuffer.get(); //得到輸入的字符
if(ch==3){ //表示Ctrl+c 關閉連接
throw new EOFException();
}else if(ch=='\r'||ch=='\n'){ //表示換行符
return true;
}else{
builder.append((char)ch); //拼接讀取到的數據
}
}
}else if(bytes==-1){
throw new EOFException(); //客戶端關閉了連接
}
return false;
}
private void write() throws IOException {
int written=-1;
outputBuffer.flip(); //轉化為讀模式,判斷是否有數據需要發送
if(outputBuffer.hasRemaining()){
written=channel.write(outputBuffer); //把數據寫回客戶端
}
outputBuffer.clear();
builder.delete(0,builder.length());
if(written<=0){ //表示客戶端沒有輸信息
this.sk.channel().close();
}else{
channel.write(ByteBuffer.wrap("\r\nreactor>".getBytes()));
this.sk.interestOps(SelectionKey.OP_READ);
}
}
}
關注[跟着Mic學架構]公眾號,獲取更多精品原創