java网络编程——多线程数据收发并行
- 2020 年 3 月 18 日
- 筆記
基本介绍与思路
收发并行
前一篇博客中,完成了客户端与服务端的简单TCP交互,但这种交互是触发式的:客户端发送一条消息,服务端收到后再回送一条。没有做到收发并行。收发并行的字面意思很容易理解,即数据的发送与接收互相不干扰,相互独立。当然,要保证服务端和客户端都能做到收发并行。
业务逻辑
脱离业务逻辑的实践是毫无意义的,先描述一下本实践中的业务逻辑:一个服务端接受多个客户端的连接,连接后,向各个客户端定时发送时间戳数据,同时在并行条件下,接受各个客户端发送来的数据并显示;客户端键盘输入字符串,发送给服务端,同时在并行条件下,接收服务器发来的时间戳数据并显示。
实现思路
实现发送与接收并行,思路其实非常直观,即建立两个线程,分别用来实现输入流和输出流。我的代码的设计方案如下图所示:
- 服务端:创建一个监听客户端连接的线程,线程中一旦接收到请求,创建一个对应该客户端收发处理的对象,对象中创建输入流线程,并使用单例线程池创建输出流线程。主线程使用键盘输入流System.in来进行阻塞。同时主线程中创建Timer定时器,定时向输出流发送数据。
- 客户端:主线程发送连接请求,与服务器建立连接。使用键盘输入流System.in来阻塞主线程,同时作为输出流使用;创建一个输入流线程,异步运行,接收服务器数据。
代码分析
源代码文件结构如下图所示
服务端
服务器端分为三个部分,分别是Server.java,TCPServer.java和ClientHandler.java
Server.java
package Server; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.text.SimpleDateFormat; import java.util.TimerTask; import java.util.Timer; import java.util.Date; public class Server { private static SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd_HH:mm:ss"); public static void main(String[] args){ try { TCPServer.accept(); new Timer("Timer").schedule(new TimerTask() { @Override public void run() { TCPServer.broadcast(df.format(new Date())); } }, 1000,5000); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in)); String str; //因为ClientListen是异步线程,使用键盘输入流将主线程阻塞住,保证跟ClientListen线程同步,同时可控制ClientListen服务的退出 do{ str = bufferedReader.readLine(); }while (str.equalsIgnoreCase("serverExit")); }catch (Exception e){ System.out.println("监听请求过程中异常退出"); } try { TCPServer.stop(); } catch (IOException e) { System.out.println("关闭套接字过程中出现异常"); } finally { System.out.println("服务器端套接字已关闭!"); } } }
TCPServer.java
package Server; import java.io.IOException; import java.net.*; import java.util.ArrayList; import java.util.UUID; class TCPServer { private static int LOCAL_PORT = 3001; private static ClientListenHandle clientListenHandle; private static ArrayList<ClientHandler> clientHandlerList = new ArrayList<ClientHandler>(); static void accept() throws IOException { //创建服务器端套接字 ServerSocket serverSocket = createSocket(); InitSocket(serverSocket); System.out.println("服务器准备就绪 addr: " + Inet4Address.getLocalHost() + " /port: " + LOCAL_PORT); System.out.println("开始监听客户端连接..."); //创建线程监听客户端请求 clientListenHandle = new ClientListenHandle(serverSocket); clientListenHandle.start(); } static void stop() throws IOException { for (ClientHandler clientHandler : clientHandlerList) { clientHandler.socketClose(); } clientHandlerList.clear(); clientListenHandle.exit(); } private static ServerSocket createSocket() throws IOException { ServerSocket socket = new ServerSocket(LOCAL_PORT, 50); return socket; } private static void InitSocket(ServerSocket socket) throws SocketException { // 是否复用未完全关闭的地址端口 socket.setReuseAddress(true); // 等效Socket#setReceiveBufferSize socket.setReceiveBufferSize(64 * 1024 * 1024); // 设置serverSocket#accept超时时间,不设置即永久等待 // serverSocket.setSoTimeout(2000); // 设置性能参数:短链接,延迟,带宽的相对重要性 socket.setPerformancePreferences(1, 1, 1); } static void broadcast(String msg) { for (ClientHandler clientHandler : clientHandlerList) { clientHandler.write(msg); } } /** * 监听客户端请求的线程 */ static class ClientListenHandle extends Thread { private final ServerSocket serverSocket; private Boolean done = false; ClientListenHandle(ServerSocket serverSocket) { this.serverSocket = serverSocket; } @Override public void run() { super.run(); try { do { Socket client; try { client = serverSocket.accept(); } catch (Exception e) { continue;//某一个客户端连接失败,要保证其它客户端能正常连接 } String uuid = UUID.randomUUID().toString();//为客户端生成唯一标识 System.out.println("已接受连接client:"+uuid+" /Addr:"+client.getInetAddress()+" /Port:"+client.getPort()); //为该客户端实例化一个ClientHandler对象,注入对象删除操作的lambda表达式 ClientHandler clientHandle = new ClientHandler(client, handler -> clientHandlerList.remove(handler), uuid); clientHandle.read(); clientHandlerList.add(clientHandle); } while (!done); } catch (Exception e) { if (!done) { System.out.println("异常退出!"); } } } void exit() throws IOException { done = true; serverSocket.close(); } } }
ClientHandler.java
package Server; import java.io.*; import java.net.Socket; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ClientHandler { private final Socket client; private final ReadHandler readHandler; private final WriteHandle writeHandler; private final Removable removable; private final String uid; ClientHandler(Socket socket, Removable removable, String uid) throws IOException { this.client = socket; this.readHandler = new ReadHandler(socket.getInputStream()); this.writeHandler = new WriteHandle(socket.getOutputStream()); this.removable = removable; this.uid = uid; } void read() { readHandler.start(); } void write(String msg) { System.out.println("Server -->> " + uid + " : " + msg); writeHandler.write(msg); } /** * 把输入输出流和套接字都关闭 */ void socketClose(){ try { readHandler.exit(); writeHandler.exit(); client.close(); } catch (IOException e) { e.printStackTrace(); }finally { System.out.println("客户端:"+uid+" 套接字连接已关闭"); } } /** * 把自身从对象列表中清除掉,具体方法是使用lambda表达式来注入的 */ void removeClientHandler() { removable.removeClientHandle(this); } /** * 定义一个接口,接收lambda表达式 */ interface Removable { void removeClientHandle(ClientHandler clientHandler); } /** * 输入流操作线程 */ class ReadHandler extends Thread { private final InputStream inputStream; private Boolean flag = true; ReadHandler(InputStream inputStream) { this.inputStream = inputStream; } @Override public void run() { super.run(); BufferedReader socketInput = null; try { socketInput = new BufferedReader(new InputStreamReader(inputStream)); do { String str = socketInput.readLine(); //不知道为什么,客户端关闭时,这里直接报异常,获取不到null if (str.equalsIgnoreCase("exit")) { System.out.println("已无法读取客户端数据!"); throw new Exception(); } System.out.println(uid + " -->> server : " + str); } while (flag); } catch (Exception e) { if (flag) { System.out.println("读取客户端过程中异常退出"); ClientHandler.this.removeClientHandler(); ClientHandler.this.socketClose(); } } } void exit() throws IOException { flag = false; inputStream.close(); } } /** * 输出流操作线程,使用单例线程池,可以自动等待任务并处理,无需人工添加阻塞操作 */ class WriteHandle { private final OutputStream outputStream; private final ExecutorService executorService; WriteHandle(OutputStream outputStream) { this.outputStream = outputStream; this.executorService = Executors.newSingleThreadExecutor(); } private void write(String msg){ executorService.execute(new WriteRunnable(msg,outputStream)); } void exit() throws IOException{ outputStream.close(); executorService.shutdown(); } class WriteRunnable implements Runnable{ private final String msg; private final PrintStream printStream; WriteRunnable(String msg, OutputStream outputStream) { this.msg = msg; this.printStream = new PrintStream(outputStream); } @Override public void run() { try { printStream.println(msg); } catch (Exception e) { System.out.println("打印输出异常!"); } } } } }
客户端
Client.java
package Client; import java.io.*; import java.util.UUID; import Client.bean.ServerInfo; public class Client { public static void main(String[] args)throws IOException { ServerInfo serverInfo = new ServerInfo(UUID.randomUUID().toString(),"127.0.2.16",3001); System.out.println("准备发起服务器连接..."); System.out.println("服务器信息:Addr:"+serverInfo.getAddress()+" /Port:"+serverInfo.getPort()); try { TCPClient.connect(serverInfo); }catch (Exception e){ System.out.println("连接失败,退出"); } } }
TCPClient.java
package Client; import Client.bean.ServerInfo; import java.io.*; import java.net.*; class TCPClient { static void connect(ServerInfo serverInfo) throws IOException { Socket clientSocket = createSocket();//建立套接字 InitSocket(clientSocket);//初始化套接字 //连接远程服务器 clientSocket.connect(new InetSocketAddress(serverInfo.getAddress(), serverInfo.getPort()), 3000); System.out.println("已连接server"); try { //输入流线程 ReadHandle readHandle = new ReadHandle(clientSocket.getInputStream()); readHandle.start(); //输出流 write(clientSocket); //当输出流结束时,关闭输入流 readHandle.exit(); } catch (Exception e) { System.out.println("出现异常!"); } finally { clientSocket.close(); System.out.println("客户端结束"); } } private static Socket createSocket() throws IOException { Socket socket = new Socket(); return socket; } private static void InitSocket(Socket socket) throws SocketException { // 设置读取超时时间为2秒,超过2秒未获得数据时readline报超时异常;不设置即进行永久等待 //socket.setSoTimeout(2000); // 是否复用未完全关闭的Socket地址,对于指定bind操作后的套接字有效 socket.setReuseAddress(true); // 是否开启Nagle算法 socket.setTcpNoDelay(true); // 是否需要在长时无数据响应时发送确认数据(类似心跳包),时间大约为2小时 socket.setKeepAlive(true); // 对于close关闭操作行为进行怎样的处理;默认为false,0 // false、0:默认情况,关闭时立即返回,底层系统接管输出流,将缓冲区内的数据发送完成 // true、0:关闭时立即返回,缓冲区数据抛弃,直接发送RST结束命令到对方,并无需经过2MSL等待 // true、200:关闭时最长阻塞200毫秒,随后按第二情况处理 socket.setSoLinger(true, 20); // 是否让紧急数据内敛,默认false;紧急数据通过 socket.sendUrgentData(1);发送 socket.setOOBInline(true); // 设置接收发送缓冲器大小 socket.setReceiveBufferSize(64 * 1024 * 1024); socket.setSendBufferSize(64 * 1024 * 1024); // 设置性能参数:短链接,延迟,带宽的相对重要性 socket.setPerformancePreferences(1, 1, 1); } /** * 输出流方法 */ private static void write(Socket socket) throws IOException { //构建键盘输入流 InputStream in = System.in; BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in)); //得到socket输出流并转化为打印流 OutputStream outputStream = socket.getOutputStream(); PrintStream printStream = new PrintStream(outputStream); for(;;){ String str = bufferedReader.readLine();//从键盘输入获取内容 printStream.println(str);//通过打印流输出 if(str.equalsIgnoreCase("exit")){ break; } } printStream.close(); System.out.println("输出流关闭"); } /** * 输入流线程 */ static class ReadHandle extends Thread { private final InputStream inputStream; private Boolean done = false; ReadHandle(InputStream inputStream) { this.inputStream = inputStream; } @Override public void run() { super.run(); try { //获取输入流 BufferedReader socketInput = new BufferedReader(new InputStreamReader(inputStream)); do { String str; str = socketInput.readLine(); if (str==null) { break; } System.out.println("From server: "+ str); } while (!done); } catch (Exception e) { if (!done) { System.out.println("异常断开,或者输入异常"); } } } void exit() { done = true; try { inputStream.close(); } catch (IOException e) { e.printStackTrace(); }finally { System.out.println("输入流关闭"); } } } }
关于代码的具体分析,由于代码已有很多注释,博文中便不再赘述。
运行结果
运行结果如下所示
-
服务端
连接成功后,服务端每隔5秒向各个客户端发送时间戳信息,同时接收两个客户端发来的信息 -
客户端1
输入“I am client1”并向服务端发送,同时接收服务端发来的时间戳信息 -
客户端2
输入“I am client2”并向服务端发送,同时接收服务端发来的时间戳信息
本篇博客记录一次实践学习,使用多线程+socket编程,实现了单服务器与多客户端之间的数据收发并行,除此之外,通过思维流程图,整理了代码的设计思路并展示出来。