java網路編程——多執行緒數據收發並行
- 2020 年 3 月 18 日
- 筆記
基本介紹與思路
收發並行
前一篇部落格中,完成了客戶端與服務端的簡單TCP交互,但這種交互是觸髮式的:客戶端發送一條消息,服務端收到後再回送一條。沒有做到收發並行。收發並行的字面意思很容易理解,即數據的發送與接收互相不干擾,相互獨立。當然,要保證服務端和客戶端都能做到收發並行。
業務邏輯
脫離業務邏輯的實踐是毫無意義的,先描述一下本實踐中的業務邏輯:一個服務端接受多個客戶端的連接,連接後,向各個客戶端定時發送時間戳數據,同時在並行條件下,接受各個客戶端發送來的數據並顯示;客戶端鍵盤輸入字元串,發送給服務端,同時在並行條件下,接收伺服器發來的時間戳數據並顯示。
實現思路
實現發送與接收並行,思路其實非常直觀,即建立兩個執行緒,分別用來實現輸入流和輸出流。我的程式碼的設計方案如下圖所示:
- 服務端:創建一個監聽客戶端連接的執行緒,執行緒中一旦接收到請求,創建一個對應該客戶端收發處理的對象,對象中創建輸入流執行緒,並使用單例執行緒池創建輸出流執行緒。主執行緒使用鍵盤輸入流System.in來進行阻塞。同時主執行緒中創建Timer定時器,定時向輸出流發送數據。
- 客戶端:主執行緒發送連接請求,與伺服器建立連接。使用鍵盤輸入流System.in來阻塞主執行緒,同時作為輸出流使用;創建一個輸入流執行緒,非同步運行,接收伺服器數據。
程式碼分析
源程式碼文件結構如下圖所示
服務端
伺服器端分為三個部分,分別是Server.java,TCPServer.java和ClientHandler.java
Server.java
package Server; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.text.SimpleDateFormat; import java.util.TimerTask; import java.util.Timer; import java.util.Date; public class Server { private static SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd_HH:mm:ss"); public static void main(String[] args){ try { TCPServer.accept(); new Timer("Timer").schedule(new TimerTask() { @Override public void run() { TCPServer.broadcast(df.format(new Date())); } }, 1000,5000); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); String str; //因為ClientListen是非同步執行緒,使用鍵盤輸入流將主執行緒阻塞住,保證跟ClientListen執行緒同步,同時可控制ClientListen服務的退出 do{ str = bufferedReader.readLine(); }while (str.equalsIgnoreCase("serverExit")); }catch (Exception e){ System.out.println("監聽請求過程中異常退出"); } try { TCPServer.stop(); } catch (IOException e) { System.out.println("關閉套接字過程中出現異常"); } finally { System.out.println("伺服器端套接字已關閉!"); } } }
TCPServer.java
package Server; import java.io.IOException; import java.net.*; import java.util.ArrayList; import java.util.UUID; class TCPServer { private static int LOCAL_PORT = 3001; private static ClientListenHandle clientListenHandle; private static ArrayList<ClientHandler> clientHandlerList = new ArrayList<ClientHandler>(); static void accept() throws IOException { //創建伺服器端套接字 ServerSocket serverSocket = createSocket(); InitSocket(serverSocket); System.out.println("伺服器準備就緒 addr: " + Inet4Address.getLocalHost() + " /port: " + LOCAL_PORT); System.out.println("開始監聽客戶端連接..."); //創建執行緒監聽客戶端請求 clientListenHandle = new ClientListenHandle(serverSocket); clientListenHandle.start(); } static void stop() throws IOException { for (ClientHandler clientHandler : clientHandlerList) { clientHandler.socketClose(); } clientHandlerList.clear(); clientListenHandle.exit(); } private static ServerSocket createSocket() throws IOException { ServerSocket socket = new ServerSocket(LOCAL_PORT, 50); return socket; } private static void InitSocket(ServerSocket socket) throws SocketException { // 是否復用未完全關閉的地址埠 socket.setReuseAddress(true); // 等效Socket#setReceiveBufferSize socket.setReceiveBufferSize(64 * 1024 * 1024); // 設置serverSocket#accept超時時間,不設置即永久等待 // serverSocket.setSoTimeout(2000); // 設置性能參數:短鏈接,延遲,頻寬的相對重要性 socket.setPerformancePreferences(1, 1, 1); } static void broadcast(String msg) { for (ClientHandler clientHandler : clientHandlerList) { clientHandler.write(msg); } } /** * 監聽客戶端請求的執行緒 */ static class ClientListenHandle extends Thread { private final ServerSocket serverSocket; private Boolean done = false; ClientListenHandle(ServerSocket serverSocket) { this.serverSocket = serverSocket; } @Override public void run() { super.run(); try { do { Socket client; try { client = serverSocket.accept(); } catch (Exception e) { continue;//某一個客戶端連接失敗,要保證其它客戶端能正常連接 } String uuid = UUID.randomUUID().toString();//為客戶端生成唯一標識 System.out.println("已接受連接client:"+uuid+" /Addr:"+client.getInetAddress()+" /Port:"+client.getPort()); //為該客戶端實例化一個ClientHandler對象,注入對象刪除操作的lambda表達式 ClientHandler clientHandle = new ClientHandler(client, handler -> clientHandlerList.remove(handler), uuid); clientHandle.read(); clientHandlerList.add(clientHandle); } while (!done); } catch (Exception e) { if (!done) { System.out.println("異常退出!"); } } } void exit() throws IOException { done = true; serverSocket.close(); } } }
ClientHandler.java
package Server; import java.io.*; import java.net.Socket; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ClientHandler { private final Socket client; private final ReadHandler readHandler; private final WriteHandle writeHandler; private final Removable removable; private final String uid; ClientHandler(Socket socket, Removable removable, String uid) throws IOException { this.client = socket; this.readHandler = new ReadHandler(socket.getInputStream()); this.writeHandler = new WriteHandle(socket.getOutputStream()); this.removable = removable; this.uid = uid; } void read() { readHandler.start(); } void write(String msg) { System.out.println("Server -->> " + uid + " : " + msg); writeHandler.write(msg); } /** * 把輸入輸出流和套接字都關閉 */ void socketClose(){ try { readHandler.exit(); writeHandler.exit(); client.close(); } catch (IOException e) { e.printStackTrace(); }finally { System.out.println("客戶端:"+uid+" 套接字連接已關閉"); } } /** * 把自身從對象列表中清除掉,具體方法是使用lambda表達式來注入的 */ void removeClientHandler() { removable.removeClientHandle(this); } /** * 定義一個介面,接收lambda表達式 */ interface Removable { void removeClientHandle(ClientHandler clientHandler); } /** * 輸入流操作執行緒 */ class ReadHandler extends Thread { private final InputStream inputStream; private Boolean flag = true; ReadHandler(InputStream inputStream) { this.inputStream = inputStream; } @Override public void run() { super.run(); BufferedReader socketInput = null; try { socketInput = new BufferedReader(new InputStreamReader(inputStream)); do { String str = socketInput.readLine(); //不知道為什麼,客戶端關閉時,這裡直接報異常,獲取不到null if (str.equalsIgnoreCase("exit")) { System.out.println("已無法讀取客戶端數據!"); throw new Exception(); } System.out.println(uid + " -->> server : " + str); } while (flag); } catch (Exception e) { if (flag) { System.out.println("讀取客戶端過程中異常退出"); ClientHandler.this.removeClientHandler(); ClientHandler.this.socketClose(); } } } void exit() throws IOException { flag = false; inputStream.close(); } } /** * 輸出流操作執行緒,使用單例執行緒池,可以自動等待任務並處理,無需人工添加阻塞操作 */ class WriteHandle { private final OutputStream outputStream; private final ExecutorService executorService; WriteHandle(OutputStream outputStream) { this.outputStream = outputStream; this.executorService = Executors.newSingleThreadExecutor(); } private void write(String msg){ executorService.execute(new WriteRunnable(msg,outputStream)); } void exit() throws IOException{ outputStream.close(); executorService.shutdown(); } class WriteRunnable implements Runnable{ private final String msg; private final PrintStream printStream; WriteRunnable(String msg, OutputStream outputStream) { this.msg = msg; this.printStream = new PrintStream(outputStream); } @Override public void run() { try { printStream.println(msg); } catch (Exception e) { System.out.println("列印輸出異常!"); } } } } }
客戶端
Client.java
package Client; import java.io.*; import java.util.UUID; import Client.bean.ServerInfo; public class Client { public static void main(String[] args)throws IOException { ServerInfo serverInfo = new ServerInfo(UUID.randomUUID().toString(),"127.0.2.16",3001); System.out.println("準備發起伺服器連接..."); System.out.println("伺服器資訊:Addr:"+serverInfo.getAddress()+" /Port:"+serverInfo.getPort()); try { TCPClient.connect(serverInfo); }catch (Exception e){ System.out.println("連接失敗,退出"); } } }
TCPClient.java
package Client; import Client.bean.ServerInfo; import java.io.*; import java.net.*; class TCPClient { static void connect(ServerInfo serverInfo) throws IOException { Socket clientSocket = createSocket();//建立套接字 InitSocket(clientSocket);//初始化套接字 //連接遠程伺服器 clientSocket.connect(new InetSocketAddress(serverInfo.getAddress(), serverInfo.getPort()), 3000); System.out.println("已連接server"); try { //輸入流執行緒 ReadHandle readHandle = new ReadHandle(clientSocket.getInputStream()); readHandle.start(); //輸出流 write(clientSocket); //當輸出流結束時,關閉輸入流 readHandle.exit(); } catch (Exception e) { System.out.println("出現異常!"); } finally { clientSocket.close(); System.out.println("客戶端結束"); } } private static Socket createSocket() throws IOException { Socket socket = new Socket(); return socket; } private static void InitSocket(Socket socket) throws SocketException { // 設置讀取超時時間為2秒,超過2秒未獲得數據時readline報超時異常;不設置即進行永久等待 //socket.setSoTimeout(2000); // 是否復用未完全關閉的Socket地址,對於指定bind操作後的套接字有效 socket.setReuseAddress(true); // 是否開啟Nagle演算法 socket.setTcpNoDelay(true); // 是否需要在長時無數據響應時發送確認數據(類似心跳包),時間大約為2小時 socket.setKeepAlive(true); // 對於close關閉操作行為進行怎樣的處理;默認為false,0 // false、0:默認情況,關閉時立即返回,底層系統接管輸出流,將緩衝區內的數據發送完成 // true、0:關閉時立即返回,緩衝區數據拋棄,直接發送RST結束命令到對方,並無需經過2MSL等待 // true、200:關閉時最長阻塞200毫秒,隨後按第二情況處理 socket.setSoLinger(true, 20); // 是否讓緊急數據內斂,默認false;緊急數據通過 socket.sendUrgentData(1);發送 socket.setOOBInline(true); // 設置接收發送緩衝器大小 socket.setReceiveBufferSize(64 * 1024 * 1024); socket.setSendBufferSize(64 * 1024 * 1024); // 設置性能參數:短鏈接,延遲,頻寬的相對重要性 socket.setPerformancePreferences(1, 1, 1); } /** * 輸出流方法 */ private static void write(Socket socket) throws IOException { //構建鍵盤輸入流 InputStream in = System.in; BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in)); //得到socket輸出流並轉化為列印流 OutputStream outputStream = socket.getOutputStream(); PrintStream printStream = new PrintStream(outputStream); for(;;){ String str = bufferedReader.readLine();//從鍵盤輸入獲取內容 printStream.println(str);//通過列印流輸出 if(str.equalsIgnoreCase("exit")){ break; } } printStream.close(); System.out.println("輸出流關閉"); } /** * 輸入流執行緒 */ static class ReadHandle extends Thread { private final InputStream inputStream; private Boolean done = false; ReadHandle(InputStream inputStream) { this.inputStream = inputStream; } @Override public void run() { super.run(); try { //獲取輸入流 BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream)); do { String str; str = socketInput.readLine(); if (str==null) { break; } System.out.println("From server: "+ str); } while (!done); } catch (Exception e) { if (!done) { System.out.println("異常斷開,或者輸入異常"); } } } void exit() { done = true; try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); }finally { System.out.println("輸入流關閉"); } } } }
關於程式碼的具體分析,由於程式碼已有很多注釋,博文中便不再贅述。
運行結果
運行結果如下所示
-
服務端
連接成功後,服務端每隔5秒向各個客戶端發送時間戳資訊,同時接收兩個客戶端發來的資訊 -
客戶端1
輸入「I am client1」並向服務端發送,同時接收服務端發來的時間戳資訊 -
客戶端2
輸入「I am client2」並向服務端發送,同時接收服務端發來的時間戳資訊
本篇部落格記錄一次實踐學習,使用多執行緒+socket編程,實現了單伺服器與多客戶端之間的數據收發並行,除此之外,通過思維流程圖,整理了程式碼的設計思路並展示出來。