netty(1): IO執行緒模型的變遷
- 2019 年 10 月 4 日
- 筆記
jdk 1.4之前所有的socket通訊都採用同步阻塞模式(bio),這種一請求一應答的通訊模型簡化了上層應用的開發,但是在可靠性和性能方面存在巨大的弊端,下圖是bio通訊模型圖

通常由一個獨立的Acceptor執行緒負責監聽客戶端的連接,接收到客戶端連接之後為客戶端連接創建一個新的執行緒處理請求消息,處理完成之後,返回應答消息給客戶端,執行緒銷毀。該架構最大的問題是不具備彈性伸縮能力,當並發訪問量增加後,服務端的執行緒個數和並發訪問數成線性正比,當執行緒數劇增後,會引起一系列連鎖反應,直至系統崩潰。
//同步阻塞式,單執行緒處理 public class IOServerSingleThread { private static final Logger LOGGER = LoggerFactory.getLogger(IOServer.class); public static void main(String[] args) { LOGGER.info("nihao"); ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(1234)); serverSocket.setSoTimeout(0); } catch (IOException ex) { LOGGER.error("Listen failed", ex); return; } InputStream inputstream = null; try{ while(true) { Socket socket = serverSocket.accept(); inputstream = socket.getInputStream(); LOGGER.info("Received message {}", IOUtils.toString(inputstream)); IOUtils.closeQuietly(inputstream); } } catch(IOException ex) { IOUtils.closeQuietly(inputstream); LOGGER.error("Read message failed", ex); } } } //同步阻塞多執行緒處理 public class IOServerMultiThread { private static final Logger LOGGER = LoggerFactory.getLogger(IOServerMultiThread.class); public static void main(String[] args) { ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(2345)); serverSocket.setSoTimeout(0); } catch (IOException ex) { LOGGER.error("Listen failed", ex); return; } try{ while(true) { Socket socket = serverSocket.accept(); new Thread( () -> { try{ InputStream inputstream = socket.getInputStream(); LOGGER.info("Received message {}", IOUtils.toString(inputstream)); IOUtils.closeQuietly(inputstream); } catch (IOException ex) { LOGGER.error("Read message failed", ex); } }).start(); } } catch(IOException ex) { LOGGER.error("Accept connection failed", ex); } } }
由於執行緒數會隨著客戶端請求的增加而增加,由此我們進行分析優化,遞進出如下的模型:偽非同步模型

服務端接收到客戶端連接之後,不創建獨立的執行緒,而是將socket連接封裝成Task,將Task放入執行緒池的任務隊列中執行,這樣可以有效控制執行緒的規模,防止執行緒膨脹導致系統的崩潰,還能有效重複利用執行緒
//偽非同步模型 public class IOServerThreadPool { private static final Logger LOGGER = LoggerFactory.getLogger(IOServerThreadPool.class); public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); ServerSocket serverSocket = null; try { serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress(2345)); } catch (IOException ex) { LOGGER.error("Listen failed", ex); return; } try{ while(true) { Socket socket = serverSocket.accept(); executorService.submit(() -> { try{ InputStream inputstream = socket.getInputStream(); LOGGER.info("Received message {}", IOUtils.toString(new InputStreamReader(inputstream))); } catch (IOException ex) { LOGGER.error("Read message failed", ex); } }); } } catch(IOException ex) { try { serverSocket.close(); } catch (IOException e) { } LOGGER.error("Accept connection failed", ex); } } }
下面我們來說說actor模型,reactor模型,proactor模型
actor模型是建立在用戶態的一種並發模型,實體間的調用,通過消息進行通訊,消息的發送和接收是完全非同步的,消息的發送方會將消息發送到一個FIFO隊列,消息的接收方會從這個FIFO隊列中取出消息,因為是FIFO隊列,消息必然有先來後到,先到的消息會被先處理,後到的消息阻塞,每一個任務由以下三個屬性:tag:用以區別於系統中的其它任務
target:通訊到達的地址
communication:包含在target上的Actor處理任務時可獲取的資訊
actor的具體模型會在後面的一遍中詳細講解
reactor模型
reactor,即反應堆。reactor的一般工作工作過程是首先在Reactor中註冊事件,並在註冊時指定某個已定義的回調函數,當客戶端發送請求時,在reactor中會觸發剛才註冊的事件,並調用對應的處理函數。

這是一個reactor模型的簡略圖,比較通俗化的圖形如下

reactor模型中包含三種角色,分別是reactor,acceptor,handler
reactor: 負責派發IO事件給對應的角色處理,為了監聽IO事件,select必須實現在reactor中
acceptor: 負責接受client的連線,然後給client綁定一個handler並註冊IO事件到reactor上監聽