手動搭建I/O網路通訊框架2:BIO編程模型實現群聊

  • 2020 年 4 月 11 日
  • 筆記

第一章:手動搭建I/O網路通訊框架1:Socket和ServerSocket入門實戰,實現單聊

  在第一章中運用Socket和ServerSocket簡單的實現了網路通訊。這一章,利用BIO編程模型進行升級改造,實現群聊聊天室。

  

  如圖:當一個客戶端請求進來時,接收器會為這個客戶端分配一個工作執行緒,這個工作執行緒專職處理客戶端的操作。在上一章中,伺服器接收到客戶端請求後就跑去專門服務這個客戶端了,所以當其他請求進來時,是處理不到的。

  看到這個圖,很容易就會想到執行緒池,BIO是一個相對簡單的模型,實現它的關鍵之處也在於執行緒池。

  在上程式碼之前,先大概說清楚每個類的作用,以免弄混淆。更詳細的說明,都寫在注釋當中。

  伺服器端:

  ChatServer:這個類的作用就像圖中的Acceptor。它有兩個比較關鍵的全局變數,一個就是存儲在線用戶資訊的Map,一個就是執行緒池。這個類會監聽埠,接收客戶端的請求,然後為客戶端分配工作執行緒。還會提供一些常用的工具方法給每個工作執行緒調用,比如:發送消息、添加在線用戶等。

  ChatHandler:這個類就是工作執行緒的類。在這個項目中,它的工作很簡單:把接收到的消息轉發給其他客戶端,當然還有一些小功能,比如添加移除在線用戶。

  客戶端:

  相較於伺服器,客戶端的改動較小,主要是把等待用戶輸入資訊這個功能分到其他執行緒做,不然這個功能會一直阻塞主執行緒,導致無法接收其他客戶端的消息。

  ChatClient:客戶端啟動類,也就是主執行緒,會通過Socket和伺服器連接。也提供了兩個工具方法:發送消息和接收消息。

  UserInputHandler:專門負責等待用戶輸入資訊的執行緒,一旦有資訊鍵入,就馬上發送給伺服器。

  首先創建兩個包區分一下客戶端和伺服器,client和server

  伺服器端ChatServer:

public class ChatServer {      private int DEFAULT_PORT = 8888;      /**       * 創建一個Map存儲在線用戶的資訊。這個map可以統計在線用戶、針對這些用戶可以轉發其他用戶發送的消息       * 因為會有多個執行緒操作這個map,所以為了安全起見用ConcurrentHashMap       * 在這裡key就是客戶端的埠號,但在實際中肯定不會用埠號區分用戶,如果是web的話一般用session。       * value是IO的Writer,用以存儲客戶端發送的消息       */      private Map<Integer, Writer> map=new ConcurrentHashMap<>();      /**       * 創建執行緒池,執行緒上限為10個,如果第11個客戶端請求進來,伺服器會接收但是不會去分配執行緒處理它。       * 前10個客戶端的聊天記錄,它看不見。當有一個客戶端下線時,這第11個客戶端就會被分配執行緒,伺服器顯示在線       * 大家可以把10再設置小一點,測試看看       * */      private ExecutorService executorService= Executors.newFixedThreadPool(10);      //客戶端連接時往map添加客戶端      public void addClient(Socket socket) throws IOException {          if (socket != null) {              BufferedWriter writer = new BufferedWriter(                      new OutputStreamWriter(socket.getOutputStream())              );              map.put(socket.getPort(), writer);              System.out.println("Client["+socket.getPort()+"]:Online");          }      }        //斷開連接時map里移除客戶端      public void removeClient(Socket socket) throws Exception {          if (socket != null) {              if (map.containsKey(socket.getPort())) {                  map.get(socket.getPort()).close();                  map.remove(socket.getPort());              }              System.out.println("Client[" + socket.getPort() + "]Offline");          }      }        //轉發客戶端消息,這個方法就是把消息發送給在線的其他的所有客戶端      public void sendMessage(Socket socket, String msg) throws IOException {          //遍歷在線客戶端          for (Integer port : map.keySet()) {              //發送給在線的其他客戶端              if (port != socket.getPort()) {                  Writer writer = map.get(port);                  writer.write(msg);                  writer.flush();              }          }      }        //接收客戶端請求,並分配Handler去處理請求      public void start() {          try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {              System.out.println("Server Start,The Port is:"+DEFAULT_PORT);              while (true){                  //等待客戶端連接                  Socket socket=serverSocket.accept();                  //為客戶端分配一個ChatHandler執行緒                  executorService.execute(new ChatHandler(this,socket));              }          } catch (IOException e) {              e.printStackTrace();          }      }        public static void main(String[] args) {          ChatServer server=new ChatServer();          server.start();      }  }

  伺服器端ChatHandler:

public class ChatHandler implements Runnable {      private ChatServer server;      private Socket socket;        //構造函數,ChatServer通過這個分配Handler執行緒      public ChatHandler(ChatServer server, Socket socket) {          this.server = server;          this.socket = socket;      }        @Override      public void run() {          try {              //往map里添加這個客戶端              server.addClient(socket);              //讀取這個客戶端發送的消息              BufferedReader reader = new BufferedReader(                      new InputStreamReader(socket.getInputStream())              );              String msg = null;              while ((msg = reader.readLine()) != null) {                  //這樣拼接是為了讓其他客戶端也能看清是誰發送的消息                  String sendmsg = "Client[" + socket.getPort() + "]:" + msg;                  //伺服器列印這個消息                  System.out.println(sendmsg);                  //將收到的消息轉發給其他在線客戶端                  server.sendMessage(socket, sendmsg + "n");                  if (msg.equals("quit")) {                      break;                  }              }          } catch (IOException e) {              e.printStackTrace();          } finally {              //如果用戶退出或者發生異常,就在map中移除該客戶端              try {                  server.removeClient(socket);              } catch (Exception e) {                  e.printStackTrace();              }          }      }  }

  客戶端ChatClient:

public class ChatClient {      private BufferedReader reader;      private BufferedWriter writer;      private Socket socket;      //發送消息給伺服器      public void sendToServer(String msg) throws IOException {          //發送之前,判斷socket的輸出流是否關閉          if (!socket.isOutputShutdown()) {              //如果沒有關閉就把用戶鍵入的消息放到writer裡面              writer.write(msg + "n");              writer.flush();          }      }      //從伺服器接收消息      public String receive() throws IOException {          String msg = null;          //判斷socket的輸入流是否關閉          if (!socket.isInputShutdown()) {              //沒有關閉的話就可以通過reader讀取伺服器發送來的消息。注意:如果沒有讀取到消息執行緒會阻塞在這裡              msg = reader.readLine();          }          return msg;      }        public void start() {          //和服務創建連接          try {              socket = new Socket("127.0.0.1", 8888);              reader=new BufferedReader(                      new InputStreamReader(socket.getInputStream())              );              writer=new BufferedWriter(                      new OutputStreamWriter(socket.getOutputStream())              );              //新建一個執行緒去監聽用戶輸入的消息              new Thread(new UserInputHandler(this)).start();              /**               * 不停的讀取伺服器轉發的其他客戶端的資訊               * 記錄一下之前踩過的小坑:               * 這裡一定要創建一個msg接收資訊,如果直接用receive()方法判斷和輸出receive()的話會造成有的消息不會顯示               * 因為receive()獲取時,在返回之前是阻塞的,一旦接收到消息才會返回,也就是while這裡是阻塞的,一旦有消息就會進入到while裡面               * 這時候如果輸出的是receive(),那麼上次獲取的資訊就會丟失,然後阻塞在System.out.println               * */              String msg=null;              while ((msg=receive())!=null){                  System.out.println(msg);              }          } catch (IOException e) {              e.printStackTrace();          }finally {              try {                 if(writer!=null){                     writer.close();                 }              } catch (IOException e) {                  e.printStackTrace();              }          }      }        public static void main(String[] args) {          new ChatClient().start();      }  }

  客戶端UserInputHandler:

public class UserInputHandler implements Runnable {      private ChatClient client;        public UserInputHandler(ChatClient client) {          this.client = client;      }        @Override      public void run() {          try {              //接收用戶輸入的消息              BufferedReader reader = new BufferedReader(                      new InputStreamReader(System.in)              );              //不停的獲取reader中的System.in,實現了等待用戶輸入的效果              while (true) {                  String input = reader.readLine();                  //向伺服器發送消息                  client.sendToServer(input);                  if (input.equals("quit"))                      break;              }          } catch (IOException e) {              e.printStackTrace();          }      }  }

 

  運行測試:

  通過打開終端,通過javac編譯。如果大家是在IDEA上編碼的話可能會報編碼錯誤,在javac後面加上-encoding utf-8再接java文件就好了。

  編譯後運行,通過java運行時,又遇到了一個坑。會報找不到主類的錯誤,原來是因為加上兩個包,要在class文件名前面加上包名。比如當前在src目錄,下面有client和server兩個包,要這麼運行:java client.XXXX。可我之前明明在client文件夾下運行的java,也是不行,不知道為什麼。

  接著測試:

  1.首先在一個終端里運行ChatServer,打開伺服器

  2.在第二個終端里打開ChatClient,暫且叫A,此時伺服器的終端顯示:

  3.類似的,在第三個終端里打開ChatClient,暫且叫B,此時伺服器顯示:

  4.A中輸入hi,除了伺服器會列印hi外,B中也會顯示,圖片中的埠號和前面的不一樣,是因為中間出了點小問題,前三張截圖和後面的不是同時運行的。實際中同一個客戶端會顯示一樣的埠號:

  5.當客戶端輸入quit時就會斷開連接,最後,伺服器的顯示為: