Android | TCP的C(Java|Android)/S(Java)通信实战经典聊天室案例(文末附本案例代码实现概述、观察者模式实现小结)

  • 2019 年 11 月 5 日
  • 笔记

案例GitHub地址

创建TCP服务端

  • 在sample模块下, 新建一个名为tcp的package, 创建TcpServer:
  • 指定服务端端口号(ip 默认为本机ip) 启动循环读取消息队列的子线程, 死循环,不断等待客户端请求连接, 一旦连接上, 直接新建一个子线程(丢给ClientTask)去处理这个socket, 于是主线程又可以回到accept() 阻塞,等待下一个连接请求; 同时,将连接上的socket 对应的线程类,注册为消息队列的观察者, 让线程类担任观察者,负责接收被观察者的通知信息并做socket 通信。
/**   * <pre>   *     author : 李蔚蓬(简书_凌川江雪)   *     time   : 2019/10/30 16:57   *     desc   :指定服务端端口号(ip 默认为本机ip)   *             启动循环读取消息队列的子线程,   *             死循环,不断等待客户端请求连接,   *             一旦连接上,直接新建一个子线程(丢给ClientTask)去处理这个socket,   *             于是主线程又可以回到accept() 阻塞,等待下一个连接请求;   *             同时,将连接上的socket 对应的线程类,注册为消息队列的观察者,   *             让线程类担任观察者,负责接收被观察者的通知信息并做socket 通信   * </pre>   */  public class TcpServer {        public void start() {            ServerSocket serverSocket = null;          try {              serverSocket = new ServerSocket(9090);              MsgPool.getInstance().start();//启动读消息的子线程                while (true) {  //            /*  //            阻塞的方法!!!  等待(客户端的) TCP 连接请求  //            客户端有 TCP 请求并连接上了 ServerSocket,.  //            那 accept() 就会返回一个 同一连接上 对应 客户一端socket 的 服务一端socket  //             */                  Socket socket = serverSocket.accept();                    //客户端连接之后,打印相关信息  //            System.out.println("ip: " + socket.getInetAddress().getHostAddress() +  //                    ", port = " + socket.getPort() + " is online...");                  System.out.println("ip = " + "***.***.***.***" +                          ", port = " + socket.getPort() + " is online...");    //            /*  //                连接上了之后不能直接拿IO流去读写,  //                因为getInputStream() 和 getOutputStream() 都是阻塞的!!!!  //                如果直接拿IO 流,不做其他处理,  //                那么Server端的处理流程是这样的:  //                accept()-- getInputStream()处理第一个客户端 -- 处理完毕,accept()-- getInputStream()处理第二个客户端....  //                所以必须开启子线程去读写客户端,才能做成聊天室  //  //                针对每一个连接上来的客户端去单独起一个线程,跟客户端进行通信  //  //                过程:客户端连上之后,打印其信息,  //                然后直接新建一个子线程(丢给ClientTask)去处理这个socket,  //                于是主线程又可以回到accept() 阻塞,等待下一个连接请求  //             */                  ClientTask clientTask = new ClientTask(socket);                  MsgPool.getInstance().addMsgComingListener(clientTask);                  clientTask.start();                  }              } catch (IOException e) {              e.printStackTrace();          }      }        public static void main(String[] args) {          new TcpServer().start();      }  }
  • 针对每一个连接上来的客户端去单独起一个线程,跟客户端进行通信, 准备一个线程类,名为ClientTask, 针对每一个连接上来的客户端去单独起一个线程,跟客户端进行通信, 这里便是线程类; run()中死循环不断读取本类实例对应的客户端发来的信息, 或者发送给对应的连接对面客户端(服务端)要发送的信息; 实现MsgPool.MsgComingListener, 成为消息队列的观察者!!!
/**   * <pre>   *     author : 李蔚蓬(简书_凌川江雪)   *     time   : 2019/10/30 17:23   *     desc   :针对每一个连接上来的客户端去单独起一个线程,跟客户端进行通信,   *             这里便是线程类;   *             run()中死循环不断读取客户端发来的信息,发送给客户端(服务端)要发送的信息;   *             实现MsgPool.MsgComingListener, 成为消息队列的观察者!!!   * </pre>   */  public class ClientTask extends Thread implements MsgPool.MsgComingListener {        private Socket mSocket;      private InputStream mIs;      private OutputStream mOs;        public ClientTask(Socket socket) {            try {              mSocket = socket;              mIs = socket.getInputStream();              mOs = socket.getOutputStream();          } catch (IOException e) {              e.printStackTrace();          }        }        @Override      public void run() {          BufferedReader br = new BufferedReader(new InputStreamReader(mIs));            String line = null;          /*              读取并输出客户端信息。              如果没有客户端发送信息,readLine() 便会阻塞在原地           */          try {              while ((line = br.readLine()) != null) {                  System.out.println("read " + mSocket.getPort() + " = " + line);                  //把信息发送加入到消息队列,                  // 借助消息队列的被观察者通知方法,                  // 将消息转发至其他Socket(所有socket都在创建ClientTask的时候,                  // 备注成为MsgPool 的观察者)                  MsgPool.getInstance().sendMsg(mSocket.getPort() + ": " + line);              }          } catch (IOException e) {              e.printStackTrace();          }        }        //作为消息队列的观察者对应的更新方法,      // 消息队列中最新的消息会推送通知到这里的msg参数,      // 这里拿到最新的推送消息后,写进输出流,      // 推到TCP 连接的客户一端的 socket      @Override      public void onMsgComing(String msg) {          try {              mOs.write(msg.getBytes());              mOs.write("n".getBytes());              mOs.flush();          } catch (IOException e) {              e.printStackTrace();          }      }  }
  • 准备一个消息队列, 每一个Client发送过来的消息, 都会被加入到队列当中去, 队列中默认有一个子线程, 专门从队列中,死循环,不断去取数据(取出队列的队头), 取到数据就做相关处理,比如分发给其他的socket;
/**   * <pre>   *     author : 李蔚蓬(简书_凌川江雪)   *     time   : 2019/10/30 17:45   *     desc   :每一个Client发送过来的消息,   *             都会被加入到队列当中去,   *             队列中默认有一个子线程,   *             专门从队列中,死循环,不断去取数据,   *             取到数据就做相关处理,比如分发给其他的socket;   * </pre>   */  public class MsgPool {        private static MsgPool mInstance = new MsgPool();        /*          这里默认消息是String类型,          或者可以自行封装一个Model 类,存储更详细的信息            block n.块; 街区;障碍物,阻碍          顾名思义,这是一个阻塞的队列,当有消息过来时,就把消息发送给这个队列,          这边会起一个线程专门从队列里面去取消息,          如果队列中没有消息,就会阻塞在原地       */      private LinkedBlockingQueue<String> mQueue = new LinkedBlockingQueue<>();        public static MsgPool getInstance() {          return mInstance;      }        private MsgPool() {      }        //这是一个阻塞的队列,      // 当有消息过来时,即客户端接收到消息时,      // 就把消息发送(添加)到这个队列中      //现在所有的客户端都可以发送消息到这个队列中      public void sendMsg(String msg) {          try {              mQueue.put(msg);          } catch (InterruptedException e) {              e.printStackTrace();          }        }        //要一早就调用本方法,      // 启动这个读取消息的线程,在后台不断运行      public void start() {          //开启一个线程去读队列的数据          new Thread() {              @Override              public void run() {                  //无限循环读取信息                  while (true) {                      try {                          //取出并移除队头;没有消息时,take()是阻塞的                          String msg = mQueue.take();                          notifyMsgComing(msg);                      } catch (InterruptedException e) {                          e.printStackTrace();                      }                  }              }          }.start();      }        //被观察者方法,遍历所有已注册的观察者,一次性通知更新      private void notifyMsgComing(String msg) {          for (MsgComingListener listener : mListeners) {              listener.onMsgComing(msg);          }      }        //观察者接口      public interface MsgComingListener {          void onMsgComing(String msg);//更新方法      }        //被观察者,存放观察者      private List<MsgComingListener> mListeners = new ArrayList<>();        //被观察者方法,添加观察者到列表      public void addMsgComingListener(MsgComingListener listener) {          mListeners.add(listener);      }  }

所有的客户端都可发送消息到队列中, 然后所有的客户端都在等待 消息队列的消息新增(mQueue.put())这个时刻, 消息队列一新增消息, 即一接收到某个客户端发送过来消息(mQueue.put()), 则消息都会一次性转发给所有客户端, 所以这里涉及到一个观察者设计模式, 消息队列(MsgPool)或消息(Msg)是被观察者, 所有客户端处理线程(ClientTask)都是观察者

观察者模式实现小结: 观察者接口准备更新(数据或UI的)方法; 被观察者接口准备三个抽象方法; 观察者实现类具体实现更新逻辑,可以有参数,参数为更新需要的数据; 被观察者实现类准备一个观察者List以及实现三个方法: 1.观察者注册方法: 参数为某观察者,功能是把观察者参数加到观察者List中; 2.注销观察者方法: 参数为某观察者,功能是把观察者参数从观察者List中移除; 3.通知观察者方法:无参数或者把需要通知的数据作为参数, 功能是遍历所有已注册的观察者, 即遍历 注册添加到 观察者List中的观察者,逐个调用List中所有观察者的更新方法;即一次性更新所有已注册的观察者! 使用时, 实例化一个被观察者和若干个观察者, 将所有观察者注册到被观察者处, 调用被观察者的通知方法,一次性更新所有已注册的观察者!

创建TCP客户端

  • 创建两个Package,整理一下项目架构,创建一个TcpClient:
/**   * <pre>   *     author : 李蔚蓬(简书_凌川江雪)   *     time   : 2019/10/31 15:36   *     desc   :   * </pre>   */  public class TcpClient {        private Scanner mScanner;        public TcpClient() {          mScanner = new Scanner(System.in);          mScanner.useDelimiter("n");      }        /**       * 配置socket       * 准备IO 流,       * 主线程写,子线程读       *       */      public void start() {          try {              Socket socket = new Socket("***", 9090);              InputStream is = socket.getInputStream();              OutputStream os = socket.getOutputStream();                final BufferedReader br = new BufferedReader(new InputStreamReader(is));              BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));                /*                  实现:                  通过 reader,                  在任何时候 能够读到 Server端 发来的数据                  通过 writer,                  在任何时候 能够向 Server端 去写数据               */              //在等待客户端 发送消息过来的话,这里是需要阻塞的,              // 阻塞的时候又没有办法向客户端发送数据,所以读写独立的话,肯定是要起线程的                //起一个线程,专门用于              // 读Server 端 发来的数据,数据一过来就读然后输出,              // 输出服务端发送的数据              new Thread() {                  @Override                  public void run() {                        try {                          String line = null;                          while ((line = br.readLine()) != null) {                              System.out.println(line);                          }                      } catch (IOException e) {                      }                  }              }.start();                //给Server端 发送数据              while (true) {                  //next() 是阻塞的,不断地读控制面板,有数据就会通过bufferWriter,                  // 即outputStream 写给Server                  String msg = mScanner.next();                  bw.write(msg);                  bw.newLine();                  bw.flush();              }            } catch (IOException e) {              e.printStackTrace();          }      }        public static void main(String[] args) {          new TcpClient().start();      }  }
  • 反复测试:

移植客户端到Android移动端

  • 复制TcpClient到biz包下迭代,名为TcpClientBiz:

  • rename一下MainActivity为UdpActivity:

复制UdpActivity一份,原地粘贴,命名为TcpActivity:


更改启动页面:

  • 反复测试(一个模拟机和两台真机的聊天哈哈哈):
  • 最终Server端聊天记录:
服务端诸类代码实现概述(TcpServer、ClientTask、MsgPool)
  • TcpServer: 死循环,阻塞,等待客户端请求连接,while (true) & .accept(); 一旦连接上,获取对应的socket对象并 把它丢给ClientTask的构造方法,new ClientTask(socket) 直接新建一个子线程,去处理这个socket(.start()), 将连接上的socket 对应的线程类,注册到消息队列类中的队列中, 成为消息队列的观察者;MsgPool.getInstance().addMsgComingListener(clientTask) 启动消息队列读读队列的线程, MsgPool.getInstance().start();
  • ClientTask: public class ClientTask extends Thread implements MsgPool.MsgComingListener 让线程类作为消息队列的观察者, 负责接收被观察者的通知信息并做socket 通信; 类中:
    • 1/3 构造方法: 接收TcpServer对过来的socket对象, 用之初始化其IO流;
    • 2/3 run()<读取Client的 I流,加入 MsgPool.mQueue> 封装输入流, 读取客户端发送过来的信息并输出: while ((line = br.readLine()) != null){...} System.out.println(...); 把信息发送加入到消息队列:MsgPool.getInstance().sendMsg(...); 如果没有客户端发送信息, readLine() 便会阻塞(注意这里会阻塞!所以要放在子线程!)在原地
    • 3/3 onMsgComing(String msg)<取出 MsgPool.mQueue,写入Client的 O流> 作为消息队列的观察者对应的更新方法, 消息队列中最新的消息会推送通知到这里的msg参数, (消息队列类有一个子线程死循环阻塞读取队头, String msg = mQueue.take(); notifyMsgComing(msg); notifyMsgComing中遍历所有已注册的观察者, 遍历时调用观察者的onMsgComing(msg) 正是本方法!!!) 本方法中拿到最新的推送消息后, 写进输出流, 发送给对应的 TCP 连接的客户一端socket
  • class MsgPool消息列表类
    • 实现单例模式 private static MsgPool mInstance = new MsgPool(); public static MsgPool getInstance() { return mInstance; } private MsgPool() { }
    • 准备消息列表底层数据结构: private LinkedBlockingQueue<String> mQueue = new LinkedBlockingQueue<>();
    • sendMsg(String msg): 当有消息过来时,即客户端接收到消息时, 就把消息发送(添加)到消息队列中:mQueue.put(msg); ClientTaskrun()调用本方法!!!;
    • start() 启动读取消息的子线程,在后台不断运行, 死循环 阻塞 读取队头, 一有消息取出就通知所有已注册的观察者, String msg = mQueue.take(); notifyMsgComing(msg); 在TcpServer中一开始配置好服务ip和端口就调用了;
    • 实现被观察者通知方法:notifyMsgComing(String msg) 实现被观察者方法,添加观察者到列表: public void addMsgComingListener(MsgComingListener listener) 观察者接口MsgComingListener 被观察者列表private List<MsgComingListener> mListeners = new ArrayList<>();
客户端诸类代码实现概述(TcpClientBiz、TcpActivity)
  • TcpClientBiz: 连接Server端, 后台子线程不断接收Server端发送过来的信息, 前台封装、提供向Server端发送信息的方法
    • 准备一个绑定了mainLooperHandler
    • 定义<回调机制> 回调接口及其抽象方法; 声明 全局 回调接口变量; 回调接口置入函数; setOnMsgComingListener(onMsgComingListener listener) { mListener = listener; }
    • 构造方法:
      • 开启子线程!!!, 配置连接到Server端的socket;mSocket = new Socket("***.**.*.**", 9090);
      • 通过socket获得IO流; (以上,socket,IO流都初始化给全局变量)
      • 使用全局 回调接口变量, 抽象调用业务方法;(Toast提醒、Error处理之类)
      • 调用readServerMsg()!!!;
    • readServerMsg() 一旦本类被实例化,就会被启动!!! 开启一个子线程, 拿着全局变量I流,封装成BufferReader, 死循环 阻塞等待 读取Server端信息 while ((line = br.readLine()) != null) 一旦有信息, 借助Handler.post(), 使用全局 回调接口变量抽象调用接口方法onMsgComing() 通过回调机制交给Activity层处理;
    • sendMsg(final String msg) 开启一个子线程, 拿着全局变量O流,封装成BufferWriter, 把参数msg 写入BufferWriter(O流),发送给Server端; 调用时机:在要发送消息给Server 的时候调用, 一般是EditText 右边的按钮被点击的时候
    • onDestroy(): 回收socket、IO流
  • TcpActivity 主要是各种组件的配置, 注意几点即可:
    • 需要实例化一个全局TcpClientBiz实例 然后用匿名内部类实现回调接口及其方法, 再set 给TcpClientBiz实例;
    • 点击按钮时把EditText的内容发送给Server端; msg = mEtMsg.getText().toString(); mTcpClientBiz.sendMsg(msg);
    • onDestroy()中调用mTcpClientBiz.onDestroy();回收资源

所有的客户端都可发送消息到队列中, 然后所有的客户端都在等待 消息队列的消息新增(mQueue.put())这个时刻, 消息队列一新增消息, 即一接收到某个客户端发送过来消息(mQueue.put()), 则消息都会一次性转发给所有客户端, 所以这里涉及到一个观察者设计模式, 消息队列(MsgPool)或消息(Msg)是被观察者, 所有客户端处理线程(ClientTask)都是观察者

观察者模式实现小结: 观察者接口准备更新(数据或UI的)方法; 被观察者接口准备三个抽象方法; 观察者实现类具体实现更新逻辑,可以有参数,参数为更新需要的数据; 被观察者实现类准备一个观察者List以及实现三个方法: 1.观察者注册方法: 参数为某观察者,功能是把观察者参数加到观察者List中; 2.注销观察者方法: 参数为某观察者,功能是把观察者参数从观察者List中移除; 3.通知观察者方法:无参数或者把需要通知的数据作为参数, 功能是遍历所有已注册的观察者, 即遍历 注册添加到 观察者List中的观察者,逐个调用List中所有观察者的更新方法;即一次性更新所有已注册的观察者! 使用时, 实例化一个被观察者和若干个观察者, 将所有观察者注册到被观察者处, 调用被观察者的通知方法,一次性更新所有已注册的观察者!