通過實例理解Java網路IO模型
- 2019 年 10 月 15 日
- 筆記
網路IO模型及分類
網路IO模型是一個經常被提到的問題,不同的書或者部落格說法可能都不一樣,所以沒必要死摳字眼,關鍵在於理解。
Socket連接
不管是什麼模型,所使用的socket連接都是一樣的。
以下是一個典型的應用伺服器上的連接情況。客戶的各種設備通過Http協議與Tomcat進程交互,Tomcat需要訪問Redis伺服器,它與Redis伺服器也建了好幾個連接。雖然客戶端與Tomcat建的是短連接,很快就會斷開,Tomcat與Redis是長連接,但是它們本質上都是一樣的。
建立一個Socket後,就是"本地IP+port與遠端IP+port"的一個配對,這個Socket由應用進程調用作業系統的系統調用創建,在內核空間會有一個與之對應的結構體,而應用程式拿到的是一個文件描述符(File Describer),就跟打開一個普通的文件一樣,可以讀寫。不同的進程有自己的文件描述符空間,比如進程1中有個socket的fd為100,進程2中也有一個socket的fd為100,它們對應的socket是不一樣的(當然也有可能一樣,因為socket也可以共享)。
Socket是全雙工的,可以同時讀和寫。
對於不同的應用場景,選用的網路IO模型以及其它方面的選項都不一樣。
例如針對客戶端的http 請求,我們一般使用短連接,因為客戶太多,同時使用App的客戶可能很多,但是同一時刻發送請求的客戶端遠少於正在使用的客戶數,如果都建立長連接,記憶體肯定不夠用,所以會用短連接,當然會有http的keep-alive策略,讓一次tcp連接多交互幾次http數據,這樣能減少建鏈。而對於系統內部的應用,例如Tomcat訪問Redis,訪問的機器數有限,如果每次都用短連接,會有太多的損耗用在建鏈上,所以用長連接,可以大大提高效率。
以上說的是長連接和短連接,一般在討論IO模型時不考慮這個,而是考慮的同步非同步,阻塞非阻塞等。而要確定哪種IO模型,也得看場景,對於CPU密集型的應用,例如一次請求需要兩個核不停的100%跑1分鐘,然後返回結果,這種應用使用哪種IO模型都差不多,因為瓶頸在CPU。所以一般是IO密集型的的應用才考慮如何調整IO模型以獲取最大的效率,最典型的就是Web應用,還有像Redis這種應用。
同步非同步、阻塞非阻塞的概念
同步與非同步:描述的是用戶執行緒與內核的交互方式,同步指用戶執行緒發起IO請求後需要等待或者輪詢內核IO操作完成後才能繼續執行;而非同步是指用戶執行緒發起IO請求後仍然繼續執行,當內核IO操作完成後會通知用戶執行緒,或者調用用戶執行緒註冊的回調函數。
阻塞與非阻塞:描述是用戶執行緒調用內核IO操作的方式,阻塞是指IO操作需要徹底完成後才返回到用戶空間;而非阻塞是指IO操作被調用後立即返回給用戶一個狀態值,無需等到IO操作徹底完成。
以read函數調用來說明不同的IO模式。從對端讀取數據分為兩個階段
(1)數據從設備到內核空間(圖中等待數據到達)
(2)數據從內核空間到用戶空間(圖中數據拷貝)
以下阻塞IO,非阻塞IO,IO多路復用,都是同步IO,最後是非同步IO。這個地方可能不好理解,總之同步IO必須是執行緒調用了讀寫函數後,一直阻塞等,或者輪詢查結果,而非同步IO,調完讀寫函數後立刻返回,操作完成後作業系統主動告訴執行緒。
阻塞IO
阻塞IO是指調用了read後,必須等待數據到達,並且複製到了用戶空間,才能返回,否則整個執行緒一直在等待。
所以阻塞IO的問題就是,執行緒在讀寫IO的時候不能幹其它的事情。
非阻塞IO
非阻塞IO在調用read後,可以立刻返回,然後問作業系統,數據有沒有在內核空間準備好,如果準備好了,就可以read出來了。因為不知道什麼時候準備好,要保證實時性,就得不斷的輪詢。
IO多路復用(非阻塞IO)
在使用非阻塞IO的時候,如果每個執行緒訪問網路後都不停的輪詢,那麼這個執行緒就被佔用了,那跟阻塞IO也沒什麼區別了。每個執行緒都輪詢自己的socket,這些執行緒也不能幹其它的事情。
如果能有一個專門的執行緒去輪詢所有的socket,如果數據準備好,就找一個執行緒處理,這就是IO多路復用。當然輪詢的執行緒也可以不用找其他執行緒處理,自己處理就行,例如redis就是這樣的。
IO多路復用,能夠讓一個或幾個執行緒去管理很多個(可以成千上萬)socket連接,這樣連接數就不再受限於系統能啟動的執行緒數。
我們把select輪詢抽出來放在一個執行緒里, 用戶執行緒向其註冊相關socket或IO請求,等到數據到達時通知用戶執行緒,則可以提高用戶執行緒的CPU利用率.這樣, 便實現了非同步方式。
這其中用了Reactor設計模式。
非同步IO
真正的非同步IO需要作業系統更強的支援。 IO多路復用模型中,數據到達內核後通知用戶執行緒,用戶執行緒負責從內核空間拷貝數據; 而在非同步IO模型中,當用戶執行緒收到通知時,數據已經被作業系統從內核拷貝到用戶指定的緩衝區內,用戶執行緒直接使用即可。
非同步IO用了Proactor設計模式。
常見的Web系統里很少使用非同步IO,本文不做過多的探討。
接下來通過一個簡單的java版redis說明各種IO模型。
實戰
接下來我會編寫一個簡單的java版的Redis,它只有get和set功能,並且只支援字元串,只是為了演示各種IO模型,其中一些異常處理之類的做的不到位。
1.阻塞IO+單執行緒+短連接
這種做法只用於寫HelloWorld程式,在這裡主要為了調試以及把一些公共的類提出來。
首先寫一個Redis介面
package org.ifool.niodemo.redis; public interface RedisClient { public String get(String key); public void set(String key,String value); public void close(); }
另外,有個工具類,用於拿到請求數據後,處理請求,並返回結果,還有一些byte轉String,String轉byte,在byte前面添加長度等一些函數,供後續使用。
輸入是get|key或者set|key|value,輸出為0|value或者1|null或者2|bad command。
package org.ifool.niodemo.redis; import java.util.Map; public class Util { //把一個String前邊加上一個byte,表示長度 public static byte[] addLength(String str) { byte len = (byte)str.length(); byte[] ret = new byte[len+1]; ret[0] = len; for(int i = 0; i < len; i++) { ret[i+1] = (byte)str.charAt(i); } return ret; } //根據input返回一個output,操作快取, prefixLength為true,則在前面加長度 //input: //->get|key //->set|key|value //output: //->errorcode|response // ->0|response set成功或者get有值 // ->1|response get的為null // ->2|bad command public static byte[] processRequest(Map<String,String> cache, byte[] request, int length, boolean prefixLength) { if(request == null) { return prefixLength ? addLength("2|bad command") : "2|bad command".getBytes(); } String req = new String(request,0,length); Util.log_debug("command:"+req); String[] params = req.split("\|"); if( params.length < 2 || params.length > 3 || !(params[0].equals("get") || params[0].equals("set"))) { return prefixLength ? addLength("2|bad command") : "2|bad command".getBytes(); } if(params[0].equals("get")) { String value = cache.get(params[1]); if(value == null) { return prefixLength ? addLength("1|null") : "1|null".getBytes(); } else { return prefixLength ? addLength("0|"+value) : ("0|"+value).getBytes(); } } if(params[0].equals("set") && params.length >= 3) { cache.put(params[1],params[2]); return prefixLength ? addLength("0|success"): ("0|success").getBytes(); } else { return prefixLength ? addLength("2|bad command") : "2|bad command".getBytes(); } } public static int LOG_LEVEL = 0; //0 info 1 debug public static void log_debug(String str) { if(LOG_LEVEL >= 1) { System.out.println(str); } } public static void log_info(String str) { if(LOG_LEVEL >= 0) { System.out.println(str); } } }
服務端程式碼如下,在創建服務端ServerSocket的時候,傳入埠8888, backlog的作用是客戶端建立連接時服務端沒法立即處理,能夠等待的隊列長度。服務端程式碼
package org.ifool.niodemo.redis.redis1; import org.ifool.niodemo.redis.Util; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class RedisServer1 { //全局快取 public static Map<String,String> cache = new ConcurrentHashMap<String,String>(); public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(8888,10); byte[] buffer = new byte[512]; while(true) { //接受客戶端連接請求 Socket clientSocket = null; clientSocket = serverSocket.accept(); System.out.println("client address:" + clientSocket.getRemoteSocketAddress().toString()); //讀取數據並且操作快取,然後寫回數據 try { //讀數據 InputStream in = clientSocket.getInputStream(); int bytesRead = in.read(buffer,0,512); int totalBytesRead = 0; while(bytesRead != -1) { totalBytesRead += bytesRead; bytesRead = in.read(buffer,totalBytesRead,512-totalBytesRead); } //操作快取 byte[] response = Util.processRequest(cache,buffer,totalBytesRead,false); Util.log_debug("response:"+new String(response)); //寫回數據 OutputStream os = clientSocket.getOutputStream(); os.write(response); os.flush(); clientSocket.shutdownOutput(); } catch (IOException e) { System.out.println("read or write data exception"); } finally { try { clientSocket.close(); } catch (IOException ex) { ex.printStackTrace(); } } } } }
客戶端程式碼如下:
package org.ifool.niodemo.redis.redis1; import org.ifool.niodemo.redis.RedisClient; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; public class RedisClient1 implements RedisClient { public static void main(String[] args) { RedisClient redis = new RedisClient1("127.0.0.1",8888); redis.set("123","456"); String value = redis.get("123"); System.out.print(value); } private String ip; private int port; public RedisClient1(String ip, int port) { this.ip = ip; this.port = port; } public String get(String key) { Socket socket = null; try { socket = new Socket(ip, port); } catch(IOException e) { throw new RuntimeException("connect to " + ip + ":" + port + " failed"); } try { //寫數據 OutputStream os = socket.getOutputStream(); os.write(("get|"+key).getBytes()); socket.shutdownOutput(); //不shutdown的話對端會等待read //讀數據 InputStream in = socket.getInputStream(); byte[] buffer = new byte[512]; int offset = 0; int bytesRead = in.read(buffer); while(bytesRead != -1) { offset += bytesRead; bytesRead = in.read(buffer, offset, 512-offset); } String[] response = (new String(buffer,0,offset)).split("\|"); if(response[0].equals("2")) { throw new RuntimeException("bad command"); } else if(response[0].equals("1")) { return null; } else { return response[1]; } } catch(IOException e) { throw new RuntimeException("network error"); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } public void set(String key, String value) { Socket socket = null; try { socket = new Socket(ip, port); } catch(IOException e) { throw new RuntimeException("connect to " + ip + ":" + port + " failed"); } try { OutputStream os = socket.getOutputStream(); os.write(("set|"+key+"|"+value).getBytes()); os.flush(); socket.shutdownOutput(); InputStream in = socket.getInputStream(); byte[] buffer = new byte[512]; int offset = 0; int bytesRead = in.read(buffer); while(bytesRead != -1) { offset += bytesRead; bytesRead = in.read(buffer, offset, 512-offset); } String bufString = new String(buffer,0,offset); String[] response = bufString.split("\|"); if(response[0].equals("2")) { throw new RuntimeException("bad command"); } } catch(IOException e) { throw new RuntimeException("network error"); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } public void close() { } }
2.阻塞IO+多執行緒+短連接
一般應用伺服器用的都是這種模型,主執行緒一直阻塞accept,來了一個連接就交給一個執行緒,繼續等待連接,然後這個處理執行緒讀寫完後負責關閉連接。
服務端程式碼
package org.ifool.niodemo.redis.redis2; import org.ifool.niodemo.redis.Util; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.Map; import java.util.concurrent.*; public class RedisServer2 { //全局快取 public static Map<String,String> cache = new ConcurrentHashMap<String,String>(); public static void main(String[] args) throws IOException { //用於處理請求的執行緒池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(200, 1000, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000)); ServerSocket serverSocket = new ServerSocket(8888,1000); while(true) { //接受客戶端連接請求 Socket clientSocket = serverSocket.accept(); Util.log_debug(clientSocket.getRemoteSocketAddress().toString()); //讓執行緒池處理這個請求 threadPool.execute(new RequestHandler(clientSocket)); } } } class RequestHandler implements Runnable{ private Socket clientSocket; public RequestHandler(Socket socket) { clientSocket = socket; } public void run() { byte[] buffer = new byte[512]; //讀取數據並且操作快取,然後寫回數據 try { //讀數據 InputStream in = clientSocket.getInputStream(); int bytesRead = in.read(buffer,0,512); int totalBytesRead = 0; while(bytesRead != -1) { totalBytesRead += bytesRead; bytesRead = in.read(buffer,totalBytesRead,512-totalBytesRead); } //操作快取 byte[] response = Util.processRequest(RedisServer2.cache,buffer,totalBytesRead,false); Util.log_debug("response:"+new String(response)); //寫回數據 OutputStream os = clientSocket.getOutputStream(); os.write(response); os.flush(); clientSocket.shutdownOutput(); } catch (IOException e) { System.out.println("read or write data exception"); } finally { try { clientSocket.close(); } catch (IOException ex) { ex.printStackTrace(); } } } }
客戶端程式碼,程式碼跟前邊的沒啥變化,只是這次我加了一個多執行緒的讀寫,10個執行緒每個執行緒讀寫10000次。
public static void main(String[] args) { final RedisClient redis = new RedisClient1("127.0.0.1",8888); redis.set("123","456"); String value = redis.get("123"); System.out.print(value); redis.close(); System.out.println(new Timestamp(System.currentTimeMillis())); testMultiThread(); System.out.println(new Timestamp(System.currentTimeMillis())); } public static void testMultiThread() { Thread[] threads = new Thread[10]; for(int i = 0; i < 10; i++) { threads[i] = new Thread(new Runnable() { public void run() { RedisClient redis = new RedisClient2("127.0.0.1",8888); for(int j=0; j < 300; j++) { Random rand = new Random(); String key = String.valueOf(rand.nextInt(1000)); String value = String.valueOf(rand.nextInt(1000)); redis.set(key,value); String value1 = redis.get(key); } } }); threads[i].start(); } for(int i = 0; i < 10; i++) { try { threads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } } }
用這種方式,在10個並發不停讀寫的情況下,寫10000次,出現了一些沒法連接的異常,如下:
java.net.NoRouteToHostException: Can't assign requested address
查了下跟系統參數配置,mac上不知道怎麼調就沒調,改成讀寫300次的時候沒報錯,大約用1s鍾。
3.阻塞IO+多執行緒+長連接
用短連接的時候,我們可以用inputstream.read() == -1來判斷讀取結束,但是用長連接時,數據是源源不斷的,有可能有粘包或者半包問題,我們需要能從流中找到一次請求的開始和結束。有多種方式,例如使用固定長度、固定分隔符、在前面加長度等方法。此處使用前邊加長度的方法,在前面放一個byte,表示一次請求的長度,byte最大是127,所以請求長度不應大於127個位元組。
由於我們客戶端訪問的方式是寫完請求後,等待服務端返回數據,等待期間該socket不會被其它人寫,所以不存在粘包的問題,只存在半包的問題。有些請求方式可能是寫完後在未等待服務端返回就允許其它執行緒寫,那樣就可能有半包。
一般客戶端用長連接的時候,都是建一個連接池,用的時候上鎖獲取連接,我們在這個地方直接讓一個執行緒持有一個連接一個讀寫,這樣減少了執行緒切換與上鎖的開銷,能實現更大的吞吐量。
客戶端程式碼這次發生了較大變化。
package org.ifool.niodemo.redis.redis3; import org.ifool.niodemo.redis.RedisClient; import org.ifool.niodemo.redis.Util; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; import java.sql.Timestamp; import java.util.Random; public class RedisClient3 implements RedisClient { public static void main(String[] args) { RedisClient redis = new RedisClient3("127.0.0.1",8888); redis.set("123","456"); String value = redis.get("123"); System.out.print(value); redis.close(); System.out.println(new Timestamp(System.currentTimeMillis())); testMultiThread(); System.out.println(new Timestamp(System.currentTimeMillis())); } public static void testMultiThread() { Thread[] threads = new Thread[10]; for(int i = 0; i < 10; i++) { threads[i] = new Thread(new Runnable() { public void run() { RedisClient redis = new RedisClient3("127.0.0.1",8888); for(int j=0; j < 50; j++) { Random rand = new Random(); String key = String.valueOf(rand.nextInt(1000)); String value = String.valueOf(rand.nextInt(1000)); redis.set(key,value); String value1 = redis.get(key); } } }); threads[i].start(); } for(int i = 0; i < 10; i++) { try { threads[i].join(); } catch (InterruptedException e) { e.printStackTrace(); } } } private String ip; private int port; private Socket socket; public RedisClient3(String ip, int port) { this.ip = ip; this.port = port; try { socket = new Socket(ip, port); } catch(IOException e) { throw new RuntimeException("connect to " + ip + ":" + port + " failed"); } } public String get(String key) { try { //寫數據,前邊用一個byte存儲長度 OutputStream os = socket.getOutputStream(); String cmd = "get|"+key; byte length = (byte)cmd.length(); byte[] data = new byte[cmd.length()+1]; data[0] = length; for(int i = 0; i < cmd.length(); i++) { data[i+1] = (byte)cmd.charAt(i); } os.write(data); os.flush(); //讀數據,第一個位元組是長度 InputStream in = socket.getInputStream(); int len = in.read(); if(len == -1) { throw new RuntimeException("network error"); } byte[] buffer = new byte[len]; int offset = 0; int bytesRead = in.read(buffer,0,len); while(offset < len) { offset += bytesRead; bytesRead = in.read(buffer, offset, len-offset); } String[] response = (new String(buffer,0,offset)).split("\|"); if(response[0].equals("2")) { throw new RuntimeException("bad command"); } else if(response[0].equals("1")) { return null; } else { return response[1]; } } catch(IOException e) { throw new RuntimeException("network error"); } finally { } } public void set(String key, String value) { try { //寫數據,前邊用一個byte存儲長度 OutputStream os = socket.getOutputStream(); String cmd = "set|"+key + "|" + value; byte length = (byte)cmd.length(); byte[] data = new byte[cmd.length()+1]; data[0] = length; for(int i = 0; i < cmd.length(); i++) { data[i+1] = (byte)cmd.charAt(i); } os.write(data); os.flush(); InputStream in = socket.getInputStream(); int len = in.read(); if(len == -1) { throw new RuntimeException("network error"); } byte[] buffer = new byte[len]; int offset = 0; int bytesRead = in.read(buffer,0,len); while(offset < len) { offset += bytesRead; bytesRead = in.read(buffer, offset, len-offset); } String bufString = new String(buffer,0,offset); Util.log_debug(bufString); String[] response = bufString.split("\|"); if(response[0].equals("2")) { throw new RuntimeException("bad command"); } } catch(IOException e) { throw new RuntimeException("network error"); } finally { } } public void close() { try { socket.close(); } catch(IOException ex) { ex.printStackTrace(); } } }
服務端建立一個連接,就由一個執行緒一直處理這個連接,有數據就處理,沒數據就不處理。這樣的話,每個連接一個執行緒,如果連接數較大,就會有問題。
package org.ifool.niodemo.redis.redis3; import org.ifool.niodemo.redis.Util; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.Map; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class RedisServer3 { //全局快取 public static Map<String,String> cache = new ConcurrentHashMap<String,String>(); public static void main(String[] args) throws IOException { //用於處理請求的執行緒池 ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 1000, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(5)); ServerSocket serverSocket = new ServerSocket(8888, 10); byte[] buffer = new byte[512]; while (true) { //接受客戶端連接請求 Socket clientSocket = null; try { clientSocket = serverSocket.accept(); Util.log_debug(clientSocket.getRemoteSocketAddress().toString()); } catch (IOException e) { e.printStackTrace(); } //讓執行緒池處理這個請求 threadPool.execute(new RequestHandler(clientSocket)); } } } class RequestHandler implements Runnable{ private Socket clientSocket; public RequestHandler(Socket socket) { clientSocket = socket; } public void run() { byte[] buffer = new byte[512]; //讀取數據並且操作快取,然後寫回數據 try { while(true) { //讀數據 InputStream in = clientSocket.getInputStream(); int len = in.read(); //讀取長度 if(len == -1) { throw new IOException("socket closed by client"); } int bytesRead = in.read(buffer, 0, len); int totalBytesRead = 0; while (totalBytesRead < len) { totalBytesRead += bytesRead; bytesRead = in.read(buffer, totalBytesRead, len - totalBytesRead); } //操作快取 byte[] response = Util.processRequest(RedisServer3.cache,buffer, totalBytesRead,true); Util.log_debug("response:" + new String(response)); //寫回數據 OutputStream os = clientSocket.getOutputStream(); os.write(response); os.flush(); } } catch (IOException e) { System.out.println("read or write data exception"); } finally { try { clientSocket.close(); Util.log_debug("socket closed"); } catch (IOException ex) { ex.printStackTrace(); } } } }
使用這個方式,10個執行緒連續讀寫10000次,也就是累計訪問20000萬次只需要3s。
4.阻塞IO+單執行緒輪詢+多執行緒處理+長連接(不可行)
多執行緒和長連接大大提高了效率,但是如果連接數太多,那麼需要太多的執行緒,這樣肯定不可行。這樣大部分執行緒即使沒數據也不能幹其它的,就耗在這個連接上了。
我們可不可以讓一個執行緒去負責等待這些socket,有數據了就告訴工作執行緒池。
程式碼如下,加了一個執行緒遍歷已經連接的socket,然後如果socket.getInputStream().available() > 0就通知執行緒池。
這個程式有些情況下能正常工作,但是實際是有問題的,關鍵就在於上面的available函數是阻塞的,每次輪詢所有的socket,都需要挨個等待是否已經有數據了,所以就是串列。在java里沒法對socket單獨設置非阻塞,必須從NIO才行,如果用C語言是可行的,但是這裡不行。
package org.ifool.niodemo.redis.redis4; import org.ifool.niodemo.redis.Util; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class RedisServer4 { //全局快取 public static Map<String,String> cache = new ConcurrentHashMap<String,String>(); //當前的socket final public static Set<Socket> socketSet = new HashSet<Socket>(10); public static void main(String[] args) throws IOException { //用於處理請求的執行緒池 final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 1000, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000)); ServerSocket serverSocket = new ServerSocket(8888,100); //啟動一個執行緒用於一直掃描可以讀取數據的socket,並且去掉已經關閉的連接 Thread thread = new Thread(new Runnable() { public void run() { //找到可以讀取的socket,處理 while (true) { synchronized (socketSet) { Iterator<Socket> it = socketSet.iterator(); while(it.hasNext()) { Socket socket = it.next(); if (socket.isConnected()) { try { if (!socket.isInputShutdown() && socket.getInputStream().available() > 0) { it.remove(); threadPool.execute(new RequestHandler(socket)); } } catch (IOException ex) { System.out.println("socket already closed1"); socketSet.remove(socket); try { socket.close(); } catch (IOException e) { System.out.println("socket already closed2"); } } } else { socketSet.remove(socket); try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } } }); thread.start(); while(true) { //接受客戶端連接請求,把新建的socket加入socketset Socket clientSocket = null; try { clientSocket = serverSocket.accept(); Util.log_debug("client address:" + clientSocket.getRemoteSocketAddress().toString()); synchronized (socketSet) { socketSet.add(clientSocket); } } catch (IOException e) { e.printStackTrace(); } } } } class RequestHandler implements Runnable{ private Socket clientSocket; public RequestHandler(Socket socket) { clientSocket = socket; } public void run() { byte[] buffer = new byte[512]; //讀取數據並且操作快取,然後寫回數據 try { //讀數據 InputStream in = clientSocket.getInputStream(); int len = in.read(); //讀取長度 if(len == -1) { throw new IOException("socket closed by client"); } int bytesRead = in.read(buffer, 0, len); int totalBytesRead = 0; while (totalBytesRead < len) { totalBytesRead += bytesRead; bytesRead = in.read(buffer, totalBytesRead, len - totalBytesRead); } //操作快取 byte[] response = Util.processRequest(RedisServer4.cache,buffer, totalBytesRead,true); Util.log_debug("response:" + new String(response)); //寫回數據 OutputStream os = clientSocket.getOutputStream(); os.write(response); os.flush(); synchronized (RedisServer4.socketSet) { RedisServer4.socketSet.add(clientSocket); } } catch (IOException e) { e.printStackTrace(); System.out.println("read or write data exception"); } finally { } } }
5.IO多路復用+單執行緒輪詢+多執行緒處理+長連接
在上述例子中我們試圖用普通socket實現類似select的功能,在Java里是不可行的,必須用NIO。我們只需要一個select函數就能輪詢所有的連接是否準備好數據,準備好了就能調用執行緒池裡的執行緒處理。
要使用NIO,需要了解ByteBuffer, Channel等內容,比如ByteBuffer設計的就比較麻煩,此處不再展開。
客戶端程式碼暫時不用NIO,還是用原來的,服務端程式碼如下:
package org.ifool.niodemo.redis.redis5; import org.ifool.niodemo.redis.Util; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.SyncFailedException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.*; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class RedisServer5 { //全局快取 public static Map<String,String> cache = new ConcurrentHashMap<String,String>(); public static void main(String[] args) throws IOException { //用於處理請求的執行緒池 final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(20, 1000, 30, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(1000)); ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.socket().bind(new InetSocketAddress(8888),1000); Selector selector = Selector.open(); ssc.configureBlocking(false); //必須設置成非阻塞 ssc.register(selector, SelectionKey.OP_ACCEPT); //serverSocket只關心accept while(true) { int num = selector.select(); if(num == 0) { continue; } Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); while(it.hasNext()) { SelectionKey key = it.next(); it.remove(); if(key.isAcceptable()) { SocketChannel sc = ssc.accept(); sc.configureBlocking(false); //設置成非阻塞才能監聽 sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(512) ); System.out.println("new connection"); } if(key.isReadable()) { SocketChannel clientSocketChannel = (SocketChannel)key.channel(); //System.out.println("socket readable"); if(!clientSocketChannel.isConnected()) { clientSocketChannel.finishConnect(); key.cancel(); clientSocketChannel.close(); System.out.println("socket closed2"); continue; } ByteBuffer buffer = (ByteBuffer)key.attachment(); int len = clientSocketChannel.read(buffer); Socket socket = clientSocketChannel.socket(); if(len == -1) { clientSocketChannel.finishConnect(); key.cancel(); clientSocketChannel.close(); System.out.println("socket closed1"); } else { threadPool.execute(new RequestHandler(clientSocketChannel, buffer)); } } } } } } class RequestHandler implements Runnable{ private SocketChannel channel; private ByteBuffer buffer; public RequestHandler(SocketChannel channel, Object buffer) { this.channel = channel; this.buffer = (ByteBuffer)buffer; } public void run() { //讀取數據並且操作快取,然後寫回數據 try { int position = buffer.position(); //切換成讀模式,以便把第一個位元組到長度讀出來 buffer.flip(); int len = buffer.get(); //讀取長度 if(len > position + 1) { buffer.position(position); buffer.limit(buffer.capacity()); return; } byte[] data = new byte[len]; buffer.get(data,0,len); //操作快取 byte[] response = Util.processRequest(RedisServer5.cache,data, len,true); Util.log_debug("response:" + new String(response)); buffer.clear(); buffer.put(response); buffer.flip(); channel.write(buffer); buffer.clear(); } catch (IOException e) { System.out.println("read or write data exception"); } finally { } } }
自己寫NIO程式有很多坑,上面的程式碼有時候會出問題,有些異常沒處理好。但是10個執行緒不停寫10000次也是3s多。
IO多路復用+Netty
使用java的原生NIO寫程式很容易出問題,因為API比較複雜,而且有很多異常要處理,比如連接的關閉,粘包半包等,使用Netty這種成熟的框架會比較好寫。
Netty常用的執行緒模型如下圖所示,mainReactor負責監聽server socket,accept新連接,並將建立的socket分派給subReactor。subReactor負責多路分離已連接的socket,讀寫網路數據,對業務處理功能,其扔給worker執行緒池完成。通常,subReactor個數上可與CPU個數等同。
客戶端程式碼如下所示。其中的兩個NioEventLoop就是上面的mainReactor和subReactor。第一個參數為0,是使用默認執行緒數的意思,這樣mainReactor一般是1個,subReactor一般與CPU核相通。
我們這裡只有boss(mainReactor)和worker(subReactor),一般情況下,還有一個執行緒池,用於處理真正的業務邏輯,因為worker是用來讀取和解碼數據的,如果在這個worker里處理業務邏輯,比如訪問資料庫,是不合適的。只是我們這個場景就類似於Redis,所以沒有用另一個執行緒池。
package org.ifool.niodemo.redis.redis6; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import org.ifool.niodemo.redis.Util; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Map; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class RedisServer6 { //全局快取 public static Map<String,String> cache = new ConcurrentHashMap<String,String>(); public static void main(String[] args) throws IOException, InterruptedException { //用於處理accept事件的執行緒池 EventLoopGroup bossGroup = new NioEventLoopGroup(0, new ThreadFactory() { AtomicInteger index = new AtomicInteger(0); public Thread newThread(Runnable r) { return new Thread(r,"netty-boss-"+index.getAndIncrement()); } }); //用於處理讀事件的執行緒池 EventLoopGroup workerGroup = new NioEventLoopGroup(0, new ThreadFactory() { AtomicInteger index = new AtomicInteger(0); public Thread newThread(Runnable r) { return new Thread(r,"netty-worker-"+index.getAndIncrement()); } }); ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup,workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG,50) .childHandler(new ChildChannelHandler()); ChannelFuture future = bootstrap.bind(8888).sync(); future.channel().closeFuture().sync(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } /**這個類就是供netty-worker調用的**/ class ChildChannelHandler extends ChannelInitializer<SocketChannel> { protected void initChannel(SocketChannel socketChannel) throws Exception { //先通過一個LengthFieldBasedFrameDecoder分包,再傳給RequestHandler socketChannel.pipeline() .addLast(new RedisDecoder(127,0,1)) .addLast(new RequestHandler()); } } class RequestHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf)msg; int len = buf.readableBytes() - 1; int lenField = buf.readByte(); if(len != lenField) { ByteBuf resp = Unpooled.copiedBuffer("2|bad cmd".getBytes()); ctx.write(resp); } byte[] req = new byte[len]; buf.readBytes(req,0,len); byte[] response = Util.processRequest(RedisServer6.cache,req,len,true); ByteBuf resp = Unpooled.copiedBuffer(response); ctx.write(resp); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } } class RedisDecoder extends LengthFieldBasedFrameDecoder { public RedisDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength) { super(maxFrameLength, lengthFieldOffset, lengthFieldLength); } }
可以看到,運行後產生了一個boss執行緒和10個worker執行緒。
用netty寫的就比較穩定了,10個寫成不停寫10000次也是3秒,但是不用擔心執行緒數了。
總結
網路IO模型只看概念什麼的很難理解,只有通過實例才能理解的更深刻。我們通過回答一個問題來總結一下:
為什麼redis能通過單執行緒實現上萬的tps?
我們把redis處理請求的過程細化:
(1)讀取原始數據
(2)解析並處理數據(處理業務邏輯)
(3)寫會返回數據
讀取數據是通過IO多路復用實現,而在底層,是通過epoll實現,epoll相比於select(不是Java NIO的select,是Linux的select)主要有以下兩個優點:
一是提高了遍歷socket的效率,即時有上百萬個連接,它也只會遍歷有事件的連接,而select需要全部遍歷一遍。
二是通過mmap實現了內核態與用戶態的共享記憶體,也就是數據從網卡到達複製到內核空間,不需要複製到用戶空間了,所以使用epoll,如果發現有讀事件,那麼記憶體里的數據也準備好了,不需要拷貝。
通過以上可以得出,讀取數據是十分快的。
接下來就是處理數據,這才是能使用單執行緒的本質原因。redis的業務邏輯是純記憶體操作,耗時是納秒級的,所以事件可以忽略不計。假如我們是一個複雜的web應用,業務邏輯涉及到讀資料庫,調用其它模組,那麼是不能用單執行緒的。
同樣,寫數據也是通過epoll共享記憶體,只要把結果計算後放到用戶記憶體,然後通知作業系統就可以了。
所以,redis能單執行緒支撐上萬tps的前提就是每個請求都是記憶體操作,事件都特別短,但凡有一次請求慢了,就會導致請求阻塞。假設99.99%的請求響應時間都在1ms以內,而0.01%的請求時間為1s,那麼單執行緒模型在處理1s請求的時候,剩餘1ms的請求也都得排隊。