Android | TCP的C(Java|Android)/S(Java)通訊實戰經典聊天室案例(文末附本案例程式碼實現概述、觀察者模式實現小結)

  • 2019 年 11 月 5 日
  • 筆記

案例GitHub地址

創建TCP服務端

  • 在sample模組下, 新建一個名為tcp的package, 創建TcpServer:
  • 指定服務端埠號(ip 默認為本機ip) 啟動循環讀取消息隊列的子執行緒, 死循環,不斷等待客戶端請求連接, 一旦連接上, 直接新建一個子執行緒(丟給ClientTask)去處理這個socket, 於是主執行緒又可以回到accept() 阻塞,等待下一個連接請求; 同時,將連接上的socket 對應的執行緒類,註冊為消息隊列的觀察者, 讓執行緒類擔任觀察者,負責接收被觀察者的通知資訊並做socket 通訊。
/**   * <pre>   *     author : 李蔚蓬(簡書_凌川江雪)   *     time   : 2019/10/30 16:57   *     desc   :指定服務端埠號(ip 默認為本機ip)   *             啟動循環讀取消息隊列的子執行緒,   *             死循環,不斷等待客戶端請求連接,   *             一旦連接上,直接新建一個子執行緒(丟給ClientTask)去處理這個socket,   *             於是主執行緒又可以回到accept() 阻塞,等待下一個連接請求;   *             同時,將連接上的socket 對應的執行緒類,註冊為消息隊列的觀察者,   *             讓執行緒類擔任觀察者,負責接收被觀察者的通知資訊並做socket 通訊   * </pre>   */  public class TcpServer {        public void start() {            ServerSocket serverSocket = null;          try {              serverSocket = new ServerSocket(9090);              MsgPool.getInstance().start();//啟動讀消息的子執行緒                while (true) {  //            /*  //            阻塞的方法!!!  等待(客戶端的) TCP 連接請求  //            客戶端有 TCP 請求並連接上了 ServerSocket,.  //            那 accept() 就會返回一個 同一連接上 對應 客戶一端socket 的 服務一端socket  //             */                  Socket socket = serverSocket.accept();                    //客戶端連接之後,列印相關資訊  //            System.out.println("ip: " + socket.getInetAddress().getHostAddress() +  //                    ", port = " + socket.getPort() + " is online...");                  System.out.println("ip = " + "***.***.***.***" +                          ", port = " + socket.getPort() + " is online...");    //            /*  //                連接上了之後不能直接拿IO流去讀寫,  //                因為getInputStream() 和 getOutputStream() 都是阻塞的!!!!  //                如果直接拿IO 流,不做其他處理,  //                那麼Server端的處理流程是這樣的:  //                accept()-- getInputStream()處理第一個客戶端 -- 處理完畢,accept()-- getInputStream()處理第二個客戶端....  //                所以必須開啟子執行緒去讀寫客戶端,才能做成聊天室  //  //                針對每一個連接上來的客戶端去單獨起一個執行緒,跟客戶端進行通訊  //  //                過程:客戶端連上之後,列印其資訊,  //                然後直接新建一個子執行緒(丟給ClientTask)去處理這個socket,  //                於是主執行緒又可以回到accept() 阻塞,等待下一個連接請求  //             */                  ClientTask clientTask = new ClientTask(socket);                  MsgPool.getInstance().addMsgComingListener(clientTask);                  clientTask.start();                  }              } catch (IOException e) {              e.printStackTrace();          }      }        public static void main(String[] args) {          new TcpServer().start();      }  }
  • 針對每一個連接上來的客戶端去單獨起一個執行緒,跟客戶端進行通訊, 準備一個執行緒類,名為ClientTask, 針對每一個連接上來的客戶端去單獨起一個執行緒,跟客戶端進行通訊, 這裡便是執行緒類; run()中死循環不斷讀取本類實例對應的客戶端發來的資訊, 或者發送給對應的連接對面客戶端(服務端)要發送的資訊; 實現MsgPool.MsgComingListener, 成為消息隊列的觀察者!!!
/**   * <pre>   *     author : 李蔚蓬(簡書_凌川江雪)   *     time   : 2019/10/30 17:23   *     desc   :針對每一個連接上來的客戶端去單獨起一個執行緒,跟客戶端進行通訊,   *             這裡便是執行緒類;   *             run()中死循環不斷讀取客戶端發來的資訊,發送給客戶端(服務端)要發送的資訊;   *             實現MsgPool.MsgComingListener, 成為消息隊列的觀察者!!!   * </pre>   */  public class ClientTask extends Thread implements MsgPool.MsgComingListener {        private Socket mSocket;      private InputStream mIs;      private OutputStream mOs;        public ClientTask(Socket socket) {            try {              mSocket = socket;              mIs = socket.getInputStream();              mOs = socket.getOutputStream();          } catch (IOException e) {              e.printStackTrace();          }        }        @Override      public void run() {          BufferedReader br = new BufferedReader(new InputStreamReader(mIs));            String line = null;          /*              讀取並輸出客戶端資訊。              如果沒有客戶端發送資訊,readLine() 便會阻塞在原地           */          try {              while ((line = br.readLine()) != null) {                  System.out.println("read " + mSocket.getPort() + " = " + line);                  //把資訊發送加入到消息隊列,                  // 藉助消息隊列的被觀察者通知方法,                  // 將消息轉發至其他Socket(所有socket都在創建ClientTask的時候,                  // 備註成為MsgPool 的觀察者)                  MsgPool.getInstance().sendMsg(mSocket.getPort() + ": " + line);              }          } catch (IOException e) {              e.printStackTrace();          }        }        //作為消息隊列的觀察者對應的更新方法,      // 消息隊列中最新的消息會推送通知到這裡的msg參數,      // 這裡拿到最新的推送消息後,寫進輸出流,      // 推到TCP 連接的客戶一端的 socket      @Override      public void onMsgComing(String msg) {          try {              mOs.write(msg.getBytes());              mOs.write("n".getBytes());              mOs.flush();          } catch (IOException e) {              e.printStackTrace();          }      }  }
  • 準備一個消息隊列, 每一個Client發送過來的消息, 都會被加入到隊列當中去, 隊列中默認有一個子執行緒, 專門從隊列中,死循環,不斷去取數據(取出隊列的隊頭), 取到數據就做相關處理,比如分發給其他的socket;
/**   * <pre>   *     author : 李蔚蓬(簡書_凌川江雪)   *     time   : 2019/10/30 17:45   *     desc   :每一個Client發送過來的消息,   *             都會被加入到隊列當中去,   *             隊列中默認有一個子執行緒,   *             專門從隊列中,死循環,不斷去取數據,   *             取到數據就做相關處理,比如分發給其他的socket;   * </pre>   */  public class MsgPool {        private static MsgPool mInstance = new MsgPool();        /*          這裡默認消息是String類型,          或者可以自行封裝一個Model 類,存儲更詳細的資訊            block n.塊; 街區;障礙物,阻礙          顧名思義,這是一個阻塞的隊列,當有消息過來時,就把消息發送給這個隊列,          這邊會起一個執行緒專門從隊列裡面去取消息,          如果隊列中沒有消息,就會阻塞在原地       */      private LinkedBlockingQueue<String> mQueue = new LinkedBlockingQueue<>();        public static MsgPool getInstance() {          return mInstance;      }        private MsgPool() {      }        //這是一個阻塞的隊列,      // 當有消息過來時,即客戶端接收到消息時,      // 就把消息發送(添加)到這個隊列中      //現在所有的客戶端都可以發送消息到這個隊列中      public void sendMsg(String msg) {          try {              mQueue.put(msg);          } catch (InterruptedException e) {              e.printStackTrace();          }        }        //要一早就調用本方法,      // 啟動這個讀取消息的執行緒,在後台不斷運行      public void start() {          //開啟一個執行緒去讀隊列的數據          new Thread() {              @Override              public void run() {                  //無限循環讀取資訊                  while (true) {                      try {                          //取出並移除隊頭;沒有消息時,take()是阻塞的                          String msg = mQueue.take();                          notifyMsgComing(msg);                      } catch (InterruptedException e) {                          e.printStackTrace();                      }                  }              }          }.start();      }        //被觀察者方法,遍歷所有已註冊的觀察者,一次性通知更新      private void notifyMsgComing(String msg) {          for (MsgComingListener listener : mListeners) {              listener.onMsgComing(msg);          }      }        //觀察者介面      public interface MsgComingListener {          void onMsgComing(String msg);//更新方法      }        //被觀察者,存放觀察者      private List<MsgComingListener> mListeners = new ArrayList<>();        //被觀察者方法,添加觀察者到列表      public void addMsgComingListener(MsgComingListener listener) {          mListeners.add(listener);      }  }

所有的客戶端都可發送消息到隊列中, 然後所有的客戶端都在等待 消息隊列的消息新增(mQueue.put())這個時刻, 消息隊列一新增消息, 即一接收到某個客戶端發送過來消息(mQueue.put()), 則消息都會一次性轉發給所有客戶端, 所以這裡涉及到一個觀察者設計模式, 消息隊列(MsgPool)或消息(Msg)是被觀察者, 所有客戶端處理執行緒(ClientTask)都是觀察者

觀察者模式實現小結: 觀察者介面準備更新(數據或UI的)方法; 被觀察者介面準備三個抽象方法; 觀察者實現類具體實現更新邏輯,可以有參數,參數為更新需要的數據; 被觀察者實現類準備一個觀察者List以及實現三個方法: 1.觀察者註冊方法: 參數為某觀察者,功能是把觀察者參數加到觀察者List中; 2.註銷觀察者方法: 參數為某觀察者,功能是把觀察者參數從觀察者List中移除; 3.通知觀察者方法:無參數或者把需要通知的數據作為參數, 功能是遍歷所有已註冊的觀察者, 即遍歷 註冊添加到 觀察者List中的觀察者,逐個調用List中所有觀察者的更新方法;即一次性更新所有已註冊的觀察者! 使用時, 實例化一個被觀察者和若干個觀察者, 將所有觀察者註冊到被觀察者處, 調用被觀察者的通知方法,一次性更新所有已註冊的觀察者!

創建TCP客戶端

  • 創建兩個Package,整理一下項目架構,創建一個TcpClient:
/**   * <pre>   *     author : 李蔚蓬(簡書_凌川江雪)   *     time   : 2019/10/31 15:36   *     desc   :   * </pre>   */  public class TcpClient {        private Scanner mScanner;        public TcpClient() {          mScanner = new Scanner(System.in);          mScanner.useDelimiter("n");      }        /**       * 配置socket       * 準備IO 流,       * 主執行緒寫,子執行緒讀       *       */      public void start() {          try {              Socket socket = new Socket("***", 9090);              InputStream is = socket.getInputStream();              OutputStream os = socket.getOutputStream();                final BufferedReader br = new BufferedReader(new InputStreamReader(is));              BufferedWriter bw = new BufferedWriter(new OutputStreamWriter(os));                /*                  實現:                  通過 reader,                  在任何時候 能夠讀到 Server端 發來的數據                  通過 writer,                  在任何時候 能夠向 Server端 去寫數據               */              //在等待客戶端 發送消息過來的話,這裡是需要阻塞的,              // 阻塞的時候又沒有辦法向客戶端發送數據,所以讀寫獨立的話,肯定是要起執行緒的                //起一個執行緒,專門用於              // 讀Server 端 發來的數據,數據一過來就讀然後輸出,              // 輸出服務端發送的數據              new Thread() {                  @Override                  public void run() {                        try {                          String line = null;                          while ((line = br.readLine()) != null) {                              System.out.println(line);                          }                      } catch (IOException e) {                      }                  }              }.start();                //給Server端 發送數據              while (true) {                  //next() 是阻塞的,不斷地讀控制面板,有數據就會通過bufferWriter,                  // 即outputStream 寫給Server                  String msg = mScanner.next();                  bw.write(msg);                  bw.newLine();                  bw.flush();              }            } catch (IOException e) {              e.printStackTrace();          }      }        public static void main(String[] args) {          new TcpClient().start();      }  }
  • 反覆測試:

移植客戶端到Android移動端

  • 複製TcpClient到biz包下迭代,名為TcpClientBiz:

  • rename一下MainActivity為UdpActivity:

複製UdpActivity一份,原地粘貼,命名為TcpActivity:


更改啟動頁面:

  • 反覆測試(一個模擬機和兩台真機的聊天哈哈哈):
  • 最終Server端聊天記錄:
服務端諸類程式碼實現概述(TcpServer、ClientTask、MsgPool)
  • TcpServer: 死循環,阻塞,等待客戶端請求連接,while (true) & .accept(); 一旦連接上,獲取對應的socket對象並 把它丟給ClientTask的構造方法,new ClientTask(socket) 直接新建一個子執行緒,去處理這個socket(.start()), 將連接上的socket 對應的執行緒類,註冊到消息隊列類中的隊列中, 成為消息隊列的觀察者;MsgPool.getInstance().addMsgComingListener(clientTask) 啟動消息隊列讀讀隊列的執行緒, MsgPool.getInstance().start();
  • ClientTask: public class ClientTask extends Thread implements MsgPool.MsgComingListener 讓執行緒類作為消息隊列的觀察者, 負責接收被觀察者的通知資訊並做socket 通訊; 類中:
    • 1/3 構造方法: 接收TcpServer對過來的socket對象, 用之初始化其IO流;
    • 2/3 run()<讀取Client的 I流,加入 MsgPool.mQueue> 封裝輸入流, 讀取客戶端發送過來的資訊並輸出: while ((line = br.readLine()) != null){...} System.out.println(...); 把資訊發送加入到消息隊列:MsgPool.getInstance().sendMsg(...); 如果沒有客戶端發送資訊, readLine() 便會阻塞(注意這裡會阻塞!所以要放在子執行緒!)在原地
    • 3/3 onMsgComing(String msg)<取出 MsgPool.mQueue,寫入Client的 O流> 作為消息隊列的觀察者對應的更新方法, 消息隊列中最新的消息會推送通知到這裡的msg參數, (消息隊列類有一個子執行緒死循環阻塞讀取隊頭, String msg = mQueue.take(); notifyMsgComing(msg); notifyMsgComing中遍歷所有已註冊的觀察者, 遍歷時調用觀察者的onMsgComing(msg) 正是本方法!!!) 本方法中拿到最新的推送消息後, 寫進輸出流, 發送給對應的 TCP 連接的客戶一端socket
  • class MsgPool消息列表類
    • 實現單例模式 private static MsgPool mInstance = new MsgPool(); public static MsgPool getInstance() { return mInstance; } private MsgPool() { }
    • 準備消息列表底層數據結構: private LinkedBlockingQueue<String> mQueue = new LinkedBlockingQueue<>();
    • sendMsg(String msg): 當有消息過來時,即客戶端接收到消息時, 就把消息發送(添加)到消息隊列中:mQueue.put(msg); ClientTaskrun()調用本方法!!!;
    • start() 啟動讀取消息的子執行緒,在後台不斷運行, 死循環 阻塞 讀取隊頭, 一有消息取出就通知所有已註冊的觀察者, String msg = mQueue.take(); notifyMsgComing(msg); 在TcpServer中一開始配置好服務ip和埠就調用了;
    • 實現被觀察者通知方法:notifyMsgComing(String msg) 實現被觀察者方法,添加觀察者到列表: public void addMsgComingListener(MsgComingListener listener) 觀察者介面MsgComingListener 被觀察者列表private List<MsgComingListener> mListeners = new ArrayList<>();
客戶端諸類程式碼實現概述(TcpClientBiz、TcpActivity)
  • TcpClientBiz: 連接Server端, 後檯子執行緒不斷接收Server端發送過來的資訊, 前台封裝、提供向Server端發送資訊的方法
    • 準備一個綁定了mainLooperHandler
    • 定義<回調機制> 回調介面及其抽象方法; 聲明 全局 回調介面變數; 回調介面置入函數; setOnMsgComingListener(onMsgComingListener listener) { mListener = listener; }
    • 構造方法:
      • 開啟子執行緒!!!, 配置連接到Server端的socket;mSocket = new Socket("***.**.*.**", 9090);
      • 通過socket獲得IO流; (以上,socket,IO流都初始化給全局變數)
      • 使用全局 回調介面變數, 抽象調用業務方法;(Toast提醒、Error處理之類)
      • 調用readServerMsg()!!!;
    • readServerMsg() 一旦本類被實例化,就會被啟動!!! 開啟一個子執行緒, 拿著全局變數I流,封裝成BufferReader, 死循環 阻塞等待 讀取Server端資訊 while ((line = br.readLine()) != null) 一旦有資訊, 藉助Handler.post(), 使用全局 回調介面變數抽象調用介面方法onMsgComing() 通過回調機制交給Activity層處理;
    • sendMsg(final String msg) 開啟一個子執行緒, 拿著全局變數O流,封裝成BufferWriter, 把參數msg 寫入BufferWriter(O流),發送給Server端; 調用時機:在要發送消息給Server 的時候調用, 一般是EditText 右邊的按鈕被點擊的時候
    • onDestroy(): 回收socket、IO流
  • TcpActivity 主要是各種組件的配置, 注意幾點即可:
    • 需要實例化一個全局TcpClientBiz實例 然後用匿名內部類實現回調介面及其方法, 再set 給TcpClientBiz實例;
    • 點擊按鈕時把EditText的內容發送給Server端; msg = mEtMsg.getText().toString(); mTcpClientBiz.sendMsg(msg);
    • onDestroy()中調用mTcpClientBiz.onDestroy();回收資源

所有的客戶端都可發送消息到隊列中, 然後所有的客戶端都在等待 消息隊列的消息新增(mQueue.put())這個時刻, 消息隊列一新增消息, 即一接收到某個客戶端發送過來消息(mQueue.put()), 則消息都會一次性轉發給所有客戶端, 所以這裡涉及到一個觀察者設計模式, 消息隊列(MsgPool)或消息(Msg)是被觀察者, 所有客戶端處理執行緒(ClientTask)都是觀察者

觀察者模式實現小結: 觀察者介面準備更新(數據或UI的)方法; 被觀察者介面準備三個抽象方法; 觀察者實現類具體實現更新邏輯,可以有參數,參數為更新需要的數據; 被觀察者實現類準備一個觀察者List以及實現三個方法: 1.觀察者註冊方法: 參數為某觀察者,功能是把觀察者參數加到觀察者List中; 2.註銷觀察者方法: 參數為某觀察者,功能是把觀察者參數從觀察者List中移除; 3.通知觀察者方法:無參數或者把需要通知的數據作為參數, 功能是遍歷所有已註冊的觀察者, 即遍歷 註冊添加到 觀察者List中的觀察者,逐個調用List中所有觀察者的更新方法;即一次性更新所有已註冊的觀察者! 使用時, 實例化一個被觀察者和若干個觀察者, 將所有觀察者註冊到被觀察者處, 調用被觀察者的通知方法,一次性更新所有已註冊的觀察者!