java网络编程——多线程数据收发并行

  • 2020 年 3 月 18 日
  • 笔记

基本介绍与思路

收发并行

前一篇博客中,完成了客户端与服务端的简单TCP交互,但这种交互是触发式的:客户端发送一条消息,服务端收到后再回送一条。没有做到收发并行。收发并行的字面意思很容易理解,即数据的发送与接收互相不干扰,相互独立。当然,要保证服务端和客户端都能做到收发并行。

业务逻辑

脱离业务逻辑的实践是毫无意义的,先描述一下本实践中的业务逻辑:一个服务端接受多个客户端的连接,连接后,向各个客户端定时发送时间戳数据,同时在并行条件下,接受各个客户端发送来的数据并显示;客户端键盘输入字符串,发送给服务端,同时在并行条件下,接收服务器发来的时间戳数据并显示。

实现思路

实现发送与接收并行,思路其实非常直观,即建立两个线程,分别用来实现输入流和输出流。我的代码的设计方案如下图所示:
image

  • 服务端:创建一个监听客户端连接的线程,线程中一旦接收到请求,创建一个对应该客户端收发处理的对象,对象中创建输入流线程,并使用单例线程池创建输出流线程。主线程使用键盘输入流System.in来进行阻塞。同时主线程中创建Timer定时器,定时向输出流发送数据。
  • 客户端:主线程发送连接请求,与服务器建立连接。使用键盘输入流System.in来阻塞主线程,同时作为输出流使用;创建一个输入流线程,异步运行,接收服务器数据。

代码分析

源代码文件结构如下图所示
image

服务端

服务器端分为三个部分,分别是Server.java,TCPServer.java和ClientHandler.java

Server.java
package Server;    import java.io.BufferedReader;  import java.io.IOException;  import java.io.InputStreamReader;  import java.text.SimpleDateFormat;  import java.util.TimerTask;  import java.util.Timer;  import java.util.Date;    public class Server {      private static SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd_HH:mm:ss");      public static void main(String[] args){          try {              TCPServer.accept();              new Timer("Timer").schedule(new TimerTask() {                  @Override                  public void run() {                      TCPServer.broadcast(df.format(new Date()));                  }              }, 1000,5000);              BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));              String str;              //因为ClientListen是异步线程,使用键盘输入流将主线程阻塞住,保证跟ClientListen线程同步,同时可控制ClientListen服务的退出              do{                  str = bufferedReader.readLine();              }while (str.equalsIgnoreCase("serverExit"));          }catch (Exception e){              System.out.println("监听请求过程中异常退出");          }            try {              TCPServer.stop();          } catch (IOException e) {              System.out.println("关闭套接字过程中出现异常");          } finally {              System.out.println("服务器端套接字已关闭!");          }      }    }
TCPServer.java
package Server;    import java.io.IOException;  import java.net.*;  import java.util.ArrayList;  import java.util.UUID;    class TCPServer {      private static int LOCAL_PORT = 3001;      private static ClientListenHandle clientListenHandle;      private static ArrayList<ClientHandler> clientHandlerList = new ArrayList<ClientHandler>();        static void accept() throws IOException {          //创建服务器端套接字          ServerSocket serverSocket = createSocket();          InitSocket(serverSocket);          System.out.println("服务器准备就绪 addr: " + Inet4Address.getLocalHost() + "  /port: " + LOCAL_PORT);          System.out.println("开始监听客户端连接...");          //创建线程监听客户端请求          clientListenHandle = new ClientListenHandle(serverSocket);          clientListenHandle.start();        }        static void stop() throws IOException {          for (ClientHandler clientHandler : clientHandlerList) {              clientHandler.socketClose();          }          clientHandlerList.clear();          clientListenHandle.exit();      }        private static ServerSocket createSocket() throws IOException {          ServerSocket socket = new ServerSocket(LOCAL_PORT, 50);          return socket;      }        private static void InitSocket(ServerSocket socket) throws SocketException {          // 是否复用未完全关闭的地址端口          socket.setReuseAddress(true);            // 等效Socket#setReceiveBufferSize          socket.setReceiveBufferSize(64 * 1024 * 1024);            // 设置serverSocket#accept超时时间,不设置即永久等待          // serverSocket.setSoTimeout(2000);            // 设置性能参数:短链接,延迟,带宽的相对重要性          socket.setPerformancePreferences(1, 1, 1);      }        static void broadcast(String msg) {          for (ClientHandler clientHandler : clientHandlerList) {              clientHandler.write(msg);          }        }      /**       * 监听客户端请求的线程       */      static class ClientListenHandle extends Thread {          private final ServerSocket serverSocket;          private Boolean done = false;            ClientListenHandle(ServerSocket serverSocket) {              this.serverSocket = serverSocket;          }            @Override          public void run() {              super.run();              try {                  do {                      Socket client;                      try {                          client = serverSocket.accept();                      } catch (Exception e) {                          continue;//某一个客户端连接失败,要保证其它客户端能正常连接                      }                      String uuid = UUID.randomUUID().toString();//为客户端生成唯一标识                      System.out.println("已接受连接client:"+uuid+" /Addr:"+client.getInetAddress()+" /Port:"+client.getPort());                      //为该客户端实例化一个ClientHandler对象,注入对象删除操作的lambda表达式                      ClientHandler clientHandle = new ClientHandler(client, handler -> clientHandlerList.remove(handler), uuid);                      clientHandle.read();                      clientHandlerList.add(clientHandle);                  } while (!done);              } catch (Exception e) {                  if (!done) {                      System.out.println("异常退出!");                  }              }          }            void exit() throws IOException {              done = true;              serverSocket.close();          }      }  }  
ClientHandler.java
package Server;    import java.io.*;  import java.net.Socket;  import java.util.concurrent.Executor;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;    public class ClientHandler {      private final Socket client;      private final ReadHandler readHandler;      private final WriteHandle writeHandler;      private final Removable removable;      private final String uid;        ClientHandler(Socket socket, Removable removable, String uid) throws IOException {          this.client = socket;          this.readHandler = new ReadHandler(socket.getInputStream());          this.writeHandler = new WriteHandle(socket.getOutputStream());          this.removable = removable;          this.uid = uid;      }        void read() {          readHandler.start();      }        void write(String msg) {          System.out.println("Server -->> " + uid + " : " + msg);          writeHandler.write(msg);      }        /**       * 把输入输出流和套接字都关闭       */      void socketClose(){          try {              readHandler.exit();              writeHandler.exit();              client.close();          } catch (IOException e) {              e.printStackTrace();          }finally {              System.out.println("客户端:"+uid+" 套接字连接已关闭");          }      }      /**       * 把自身从对象列表中清除掉,具体方法是使用lambda表达式来注入的       */      void removeClientHandler() {          removable.removeClientHandle(this);      }        /**       * 定义一个接口,接收lambda表达式       */      interface Removable {          void removeClientHandle(ClientHandler clientHandler);      }        /**       * 输入流操作线程       */      class ReadHandler extends Thread {          private final InputStream inputStream;          private Boolean flag = true;            ReadHandler(InputStream inputStream) {              this.inputStream = inputStream;          }            @Override          public void run() {              super.run();                BufferedReader socketInput = null;              try {                  socketInput = new BufferedReader(new InputStreamReader(inputStream));                  do {                        String str = socketInput.readLine();                      //不知道为什么,客户端关闭时,这里直接报异常,获取不到null                      if (str.equalsIgnoreCase("exit")) {                          System.out.println("已无法读取客户端数据!");                          throw new Exception();                      }                      System.out.println(uid + " -->> server : " + str);                  } while (flag);              } catch (Exception e) {                  if (flag) {                      System.out.println("读取客户端过程中异常退出");                      ClientHandler.this.removeClientHandler();                      ClientHandler.this.socketClose();                  }              }          }            void exit() throws IOException {              flag = false;              inputStream.close();          }      }        /**       * 输出流操作线程,使用单例线程池,可以自动等待任务并处理,无需人工添加阻塞操作       */      class WriteHandle {          private final OutputStream outputStream;          private final ExecutorService executorService;            WriteHandle(OutputStream outputStream) {              this.outputStream = outputStream;              this.executorService = Executors.newSingleThreadExecutor();          }          private void write(String msg){              executorService.execute(new WriteRunnable(msg,outputStream));          }          void exit() throws IOException{              outputStream.close();              executorService.shutdown();          }          class WriteRunnable implements Runnable{              private final String msg;              private final PrintStream printStream;                WriteRunnable(String msg, OutputStream outputStream) {                  this.msg = msg;                  this.printStream = new PrintStream(outputStream);              }                @Override              public void run() {                  try {                      printStream.println(msg);                  } catch (Exception e) {                      System.out.println("打印输出异常!");                  }                }          }      }  }  

客户端

Client.java
package Client;    import java.io.*;  import java.util.UUID;    import Client.bean.ServerInfo;  public class Client {      public static void main(String[] args)throws IOException {          ServerInfo serverInfo = new ServerInfo(UUID.randomUUID().toString(),"127.0.2.16",3001);          System.out.println("准备发起服务器连接...");          System.out.println("服务器信息:Addr:"+serverInfo.getAddress()+" /Port:"+serverInfo.getPort());            try {              TCPClient.connect(serverInfo);          }catch (Exception e){              System.out.println("连接失败,退出");          }      }  }  
TCPClient.java
package Client;    import Client.bean.ServerInfo;    import java.io.*;  import java.net.*;    class TCPClient {      static void connect(ServerInfo serverInfo) throws IOException {          Socket clientSocket = createSocket();//建立套接字            InitSocket(clientSocket);//初始化套接字          //连接远程服务器          clientSocket.connect(new InetSocketAddress(serverInfo.getAddress(), serverInfo.getPort()), 3000);          System.out.println("已连接server");          try {              //输入流线程              ReadHandle readHandle = new ReadHandle(clientSocket.getInputStream());              readHandle.start();                //输出流              write(clientSocket);              //当输出流结束时,关闭输入流              readHandle.exit();          } catch (Exception e) {              System.out.println("出现异常!");          } finally {              clientSocket.close();              System.out.println("客户端结束");          }      }        private static Socket createSocket() throws IOException {          Socket socket = new Socket();          return socket;      }        private static void InitSocket(Socket socket) throws SocketException {          // 设置读取超时时间为2秒,超过2秒未获得数据时readline报超时异常;不设置即进行永久等待          //socket.setSoTimeout(2000);          // 是否复用未完全关闭的Socket地址,对于指定bind操作后的套接字有效          socket.setReuseAddress(true);            // 是否开启Nagle算法          socket.setTcpNoDelay(true);            // 是否需要在长时无数据响应时发送确认数据(类似心跳包),时间大约为2小时          socket.setKeepAlive(true);            // 对于close关闭操作行为进行怎样的处理;默认为false,0          // false、0:默认情况,关闭时立即返回,底层系统接管输出流,将缓冲区内的数据发送完成          // true、0:关闭时立即返回,缓冲区数据抛弃,直接发送RST结束命令到对方,并无需经过2MSL等待          // true、200:关闭时最长阻塞200毫秒,随后按第二情况处理          socket.setSoLinger(true, 20);            // 是否让紧急数据内敛,默认false;紧急数据通过 socket.sendUrgentData(1);发送          socket.setOOBInline(true);            // 设置接收发送缓冲器大小          socket.setReceiveBufferSize(64 * 1024 * 1024);          socket.setSendBufferSize(64 * 1024 * 1024);            // 设置性能参数:短链接,延迟,带宽的相对重要性          socket.setPerformancePreferences(1, 1, 1);      }        /**       * 输出流方法       */      private static void write(Socket socket) throws IOException {          //构建键盘输入流          InputStream in = System.in;          BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in));          //得到socket输出流并转化为打印流          OutputStream outputStream = socket.getOutputStream();          PrintStream printStream = new PrintStream(outputStream);            for(;;){              String str = bufferedReader.readLine();//从键盘输入获取内容              printStream.println(str);//通过打印流输出              if(str.equalsIgnoreCase("exit")){                  break;              }          }            printStream.close();          System.out.println("输出流关闭");      }          /**       * 输入流线程       */      static class ReadHandle extends Thread {          private final InputStream inputStream;          private Boolean done = false;            ReadHandle(InputStream inputStream) {              this.inputStream = inputStream;          }            @Override          public void run() {              super.run();              try {                  //获取输入流                  BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream));                  do {                      String str;                      str = socketInput.readLine();                      if (str==null) {                          break;                      }                      System.out.println("From server: "+ str);                  } while (!done);              } catch (Exception e) {                  if (!done) {                      System.out.println("异常断开,或者输入异常");                  }              }          }            void exit() {              done = true;              try {                  inputStream.close();              } catch (IOException e) {                  e.printStackTrace();              }finally {                  System.out.println("输入流关闭");              }          }      }  }  

关于代码的具体分析,由于代码已有很多注释,博文中便不再赘述。

运行结果

运行结果如下所示

  • 服务端
    image
    连接成功后,服务端每隔5秒向各个客户端发送时间戳信息,同时接收两个客户端发来的信息

  • 客户端1
    image
    输入“I am client1”并向服务端发送,同时接收服务端发来的时间戳信息

  • 客户端2
    image
    输入“I am client2”并向服务端发送,同时接收服务端发来的时间戳信息

本篇博客记录一次实践学习,使用多线程+socket编程,实现了单服务器与多客户端之间的数据收发并行,除此之外,通过思维流程图,整理了代码的设计思路并展示出来。