java網路編程——多執行緒數據收發並行

  • 2020 年 3 月 18 日
  • 筆記

基本介紹與思路

收發並行

前一篇部落格中,完成了客戶端與服務端的簡單TCP交互,但這種交互是觸髮式的:客戶端發送一條消息,服務端收到後再回送一條。沒有做到收發並行。收發並行的字面意思很容易理解,即數據的發送與接收互相不干擾,相互獨立。當然,要保證服務端和客戶端都能做到收發並行。

業務邏輯

脫離業務邏輯的實踐是毫無意義的,先描述一下本實踐中的業務邏輯:一個服務端接受多個客戶端的連接,連接後,向各個客戶端定時發送時間戳數據,同時在並行條件下,接受各個客戶端發送來的數據並顯示;客戶端鍵盤輸入字元串,發送給服務端,同時在並行條件下,接收伺服器發來的時間戳數據並顯示。

實現思路

實現發送與接收並行,思路其實非常直觀,即建立兩個執行緒,分別用來實現輸入流和輸出流。我的程式碼的設計方案如下圖所示:
image

  • 服務端:創建一個監聽客戶端連接的執行緒,執行緒中一旦接收到請求,創建一個對應該客戶端收發處理的對象,對象中創建輸入流執行緒,並使用單例執行緒池創建輸出流執行緒。主執行緒使用鍵盤輸入流System.in來進行阻塞。同時主執行緒中創建Timer定時器,定時向輸出流發送數據。
  • 客戶端:主執行緒發送連接請求,與伺服器建立連接。使用鍵盤輸入流System.in來阻塞主執行緒,同時作為輸出流使用;創建一個輸入流執行緒,非同步運行,接收伺服器數據。

程式碼分析

源程式碼文件結構如下圖所示
image

服務端

伺服器端分為三個部分,分別是Server.java,TCPServer.java和ClientHandler.java

Server.java
package Server;    import java.io.BufferedReader;  import java.io.IOException;  import java.io.InputStreamReader;  import java.text.SimpleDateFormat;  import java.util.TimerTask;  import java.util.Timer;  import java.util.Date;    public class Server {      private static SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd_HH:mm:ss");      public static void main(String[] args){          try {              TCPServer.accept();              new Timer("Timer").schedule(new TimerTask() {                  @Override                  public void run() {                      TCPServer.broadcast(df.format(new Date()));                  }              }, 1000,5000);              BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));              String str;              //因為ClientListen是非同步執行緒,使用鍵盤輸入流將主執行緒阻塞住,保證跟ClientListen執行緒同步,同時可控制ClientListen服務的退出              do{                  str = bufferedReader.readLine();              }while (str.equalsIgnoreCase("serverExit"));          }catch (Exception e){              System.out.println("監聽請求過程中異常退出");          }            try {              TCPServer.stop();          } catch (IOException e) {              System.out.println("關閉套接字過程中出現異常");          } finally {              System.out.println("伺服器端套接字已關閉!");          }      }    }
TCPServer.java
package Server;    import java.io.IOException;  import java.net.*;  import java.util.ArrayList;  import java.util.UUID;    class TCPServer {      private static int LOCAL_PORT = 3001;      private static ClientListenHandle clientListenHandle;      private static ArrayList<ClientHandler> clientHandlerList = new ArrayList<ClientHandler>();        static void accept() throws IOException {          //創建伺服器端套接字          ServerSocket serverSocket = createSocket();          InitSocket(serverSocket);          System.out.println("伺服器準備就緒 addr: " + Inet4Address.getLocalHost() + "  /port: " + LOCAL_PORT);          System.out.println("開始監聽客戶端連接...");          //創建執行緒監聽客戶端請求          clientListenHandle = new ClientListenHandle(serverSocket);          clientListenHandle.start();        }        static void stop() throws IOException {          for (ClientHandler clientHandler : clientHandlerList) {              clientHandler.socketClose();          }          clientHandlerList.clear();          clientListenHandle.exit();      }        private static ServerSocket createSocket() throws IOException {          ServerSocket socket = new ServerSocket(LOCAL_PORT, 50);          return socket;      }        private static void InitSocket(ServerSocket socket) throws SocketException {          // 是否復用未完全關閉的地址埠          socket.setReuseAddress(true);            // 等效Socket#setReceiveBufferSize          socket.setReceiveBufferSize(64 * 1024 * 1024);            // 設置serverSocket#accept超時時間,不設置即永久等待          // serverSocket.setSoTimeout(2000);            // 設置性能參數:短鏈接,延遲,頻寬的相對重要性          socket.setPerformancePreferences(1, 1, 1);      }        static void broadcast(String msg) {          for (ClientHandler clientHandler : clientHandlerList) {              clientHandler.write(msg);          }        }      /**       * 監聽客戶端請求的執行緒       */      static class ClientListenHandle extends Thread {          private final ServerSocket serverSocket;          private Boolean done = false;            ClientListenHandle(ServerSocket serverSocket) {              this.serverSocket = serverSocket;          }            @Override          public void run() {              super.run();              try {                  do {                      Socket client;                      try {                          client = serverSocket.accept();                      } catch (Exception e) {                          continue;//某一個客戶端連接失敗,要保證其它客戶端能正常連接                      }                      String uuid = UUID.randomUUID().toString();//為客戶端生成唯一標識                      System.out.println("已接受連接client:"+uuid+" /Addr:"+client.getInetAddress()+" /Port:"+client.getPort());                      //為該客戶端實例化一個ClientHandler對象,注入對象刪除操作的lambda表達式                      ClientHandler clientHandle = new ClientHandler(client, handler -> clientHandlerList.remove(handler), uuid);                      clientHandle.read();                      clientHandlerList.add(clientHandle);                  } while (!done);              } catch (Exception e) {                  if (!done) {                      System.out.println("異常退出!");                  }              }          }            void exit() throws IOException {              done = true;              serverSocket.close();          }      }  }  
ClientHandler.java
package Server;    import java.io.*;  import java.net.Socket;  import java.util.concurrent.Executor;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;    public class ClientHandler {      private final Socket client;      private final ReadHandler readHandler;      private final WriteHandle writeHandler;      private final Removable removable;      private final String uid;        ClientHandler(Socket socket, Removable removable, String uid) throws IOException {          this.client = socket;          this.readHandler = new ReadHandler(socket.getInputStream());          this.writeHandler = new WriteHandle(socket.getOutputStream());          this.removable = removable;          this.uid = uid;      }        void read() {          readHandler.start();      }        void write(String msg) {          System.out.println("Server -->> " + uid + " : " + msg);          writeHandler.write(msg);      }        /**       * 把輸入輸出流和套接字都關閉       */      void socketClose(){          try {              readHandler.exit();              writeHandler.exit();              client.close();          } catch (IOException e) {              e.printStackTrace();          }finally {              System.out.println("客戶端:"+uid+" 套接字連接已關閉");          }      }      /**       * 把自身從對象列表中清除掉,具體方法是使用lambda表達式來注入的       */      void removeClientHandler() {          removable.removeClientHandle(this);      }        /**       * 定義一個介面,接收lambda表達式       */      interface Removable {          void removeClientHandle(ClientHandler clientHandler);      }        /**       * 輸入流操作執行緒       */      class ReadHandler extends Thread {          private final InputStream inputStream;          private Boolean flag = true;            ReadHandler(InputStream inputStream) {              this.inputStream = inputStream;          }            @Override          public void run() {              super.run();                BufferedReader socketInput = null;              try {                  socketInput = new BufferedReader(new InputStreamReader(inputStream));                  do {                        String str = socketInput.readLine();                      //不知道為什麼,客戶端關閉時,這裡直接報異常,獲取不到null                      if (str.equalsIgnoreCase("exit")) {                          System.out.println("已無法讀取客戶端數據!");                          throw new Exception();                      }                      System.out.println(uid + " -->> server : " + str);                  } while (flag);              } catch (Exception e) {                  if (flag) {                      System.out.println("讀取客戶端過程中異常退出");                      ClientHandler.this.removeClientHandler();                      ClientHandler.this.socketClose();                  }              }          }            void exit() throws IOException {              flag = false;              inputStream.close();          }      }        /**       * 輸出流操作執行緒,使用單例執行緒池,可以自動等待任務並處理,無需人工添加阻塞操作       */      class WriteHandle {          private final OutputStream outputStream;          private final ExecutorService executorService;            WriteHandle(OutputStream outputStream) {              this.outputStream = outputStream;              this.executorService = Executors.newSingleThreadExecutor();          }          private void write(String msg){              executorService.execute(new WriteRunnable(msg,outputStream));          }          void exit() throws IOException{              outputStream.close();              executorService.shutdown();          }          class WriteRunnable implements Runnable{              private final String msg;              private final PrintStream printStream;                WriteRunnable(String msg, OutputStream outputStream) {                  this.msg = msg;                  this.printStream = new PrintStream(outputStream);              }                @Override              public void run() {                  try {                      printStream.println(msg);                  } catch (Exception e) {                      System.out.println("列印輸出異常!");                  }                }          }      }  }  

客戶端

Client.java
package Client;    import java.io.*;  import java.util.UUID;    import Client.bean.ServerInfo;  public class Client {      public static void main(String[] args)throws IOException {          ServerInfo serverInfo = new ServerInfo(UUID.randomUUID().toString(),"127.0.2.16",3001);          System.out.println("準備發起伺服器連接...");          System.out.println("伺服器資訊:Addr:"+serverInfo.getAddress()+" /Port:"+serverInfo.getPort());            try {              TCPClient.connect(serverInfo);          }catch (Exception e){              System.out.println("連接失敗,退出");          }      }  }  
TCPClient.java
package Client;    import Client.bean.ServerInfo;    import java.io.*;  import java.net.*;    class TCPClient {      static void connect(ServerInfo serverInfo) throws IOException {          Socket clientSocket = createSocket();//建立套接字            InitSocket(clientSocket);//初始化套接字          //連接遠程伺服器          clientSocket.connect(new InetSocketAddress(serverInfo.getAddress(), serverInfo.getPort()), 3000);          System.out.println("已連接server");          try {              //輸入流執行緒              ReadHandle readHandle = new ReadHandle(clientSocket.getInputStream());              readHandle.start();                //輸出流              write(clientSocket);              //當輸出流結束時,關閉輸入流              readHandle.exit();          } catch (Exception e) {              System.out.println("出現異常!");          } finally {              clientSocket.close();              System.out.println("客戶端結束");          }      }        private static Socket createSocket() throws IOException {          Socket socket = new Socket();          return socket;      }        private static void InitSocket(Socket socket) throws SocketException {          // 設置讀取超時時間為2秒,超過2秒未獲得數據時readline報超時異常;不設置即進行永久等待          //socket.setSoTimeout(2000);          // 是否復用未完全關閉的Socket地址,對於指定bind操作後的套接字有效          socket.setReuseAddress(true);            // 是否開啟Nagle演算法          socket.setTcpNoDelay(true);            // 是否需要在長時無數據響應時發送確認數據(類似心跳包),時間大約為2小時          socket.setKeepAlive(true);            // 對於close關閉操作行為進行怎樣的處理;默認為false,0          // false、0:默認情況,關閉時立即返回,底層系統接管輸出流,將緩衝區內的數據發送完成          // true、0:關閉時立即返回,緩衝區數據拋棄,直接發送RST結束命令到對方,並無需經過2MSL等待          // true、200:關閉時最長阻塞200毫秒,隨後按第二情況處理          socket.setSoLinger(true, 20);            // 是否讓緊急數據內斂,默認false;緊急數據通過 socket.sendUrgentData(1);發送          socket.setOOBInline(true);            // 設置接收發送緩衝器大小          socket.setReceiveBufferSize(64 * 1024 * 1024);          socket.setSendBufferSize(64 * 1024 * 1024);            // 設置性能參數:短鏈接,延遲,頻寬的相對重要性          socket.setPerformancePreferences(1, 1, 1);      }        /**       * 輸出流方法       */      private static void write(Socket socket) throws IOException {          //構建鍵盤輸入流          InputStream in = System.in;          BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));          //得到socket輸出流並轉化為列印流          OutputStream outputStream = socket.getOutputStream();          PrintStream printStream = new PrintStream(outputStream);            for(;;){              String str = bufferedReader.readLine();//從鍵盤輸入獲取內容              printStream.println(str);//通過列印流輸出              if(str.equalsIgnoreCase("exit")){                  break;              }          }            printStream.close();          System.out.println("輸出流關閉");      }          /**       * 輸入流執行緒       */      static class ReadHandle extends Thread {          private final InputStream inputStream;          private Boolean done = false;            ReadHandle(InputStream inputStream) {              this.inputStream = inputStream;          }            @Override          public void run() {              super.run();              try {                  //獲取輸入流                  BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));                  do {                      String str;                      str = socketInput.readLine();                      if (str==null) {                          break;                      }                      System.out.println("From server: "+ str);                  } while (!done);              } catch (Exception e) {                  if (!done) {                      System.out.println("異常斷開,或者輸入異常");                  }              }          }            void exit() {              done = true;              try {                  inputStream.close();              } catch (IOException e) {                  e.printStackTrace();              }finally {                  System.out.println("輸入流關閉");              }          }      }  }  

關於程式碼的具體分析,由於程式碼已有很多注釋,博文中便不再贅述。

運行結果

運行結果如下所示

  • 服務端
    image
    連接成功後,服務端每隔5秒向各個客戶端發送時間戳資訊,同時接收兩個客戶端發來的資訊

  • 客戶端1
    image
    輸入「I am client1」並向服務端發送,同時接收服務端發來的時間戳資訊

  • 客戶端2
    image
    輸入「I am client2」並向服務端發送,同時接收服務端發來的時間戳資訊

本篇部落格記錄一次實踐學習,使用多執行緒+socket編程,實現了單伺服器與多客戶端之間的數據收發並行,除此之外,通過思維流程圖,整理了程式碼的設計思路並展示出來。