netty無縫切換rabbitmq、activemq、rocketmq實現聊天室單聊、群聊功能

  • 2019 年 10 月 3 日
  • 筆記

file

file

netty的pipeline處理鏈上的handler:需要IdleStateHandler心跳檢測channel是否有效,以及處理登錄認證的UserAuthHandler和消息處理MessageHandler

protected void initChannel(SocketChannel ch) throws Exception {      ch.pipeline().addLast(defLoopGroup,          //編碼解碼器          new HttpServerCodec(),          //將多個消息轉換成單一的消息對象          new HttpObjectAggregator(65536),          //支援非同步發送大的碼流,一般用於發送文件流          new ChunkedWriteHandler(),          //檢測鏈路是否讀空閑,配合心跳handler檢測channel是否正常          new IdleStateHandler(60, 0, 0),          //處理握手和認證          new UserAuthHandler(),          //處理消息的發送          new MessageHandler()      );  }

對於所有連進來的channel,我們需要保存起來,往後的群發消息需要依靠這些channel

public static void addChannel(Channel channel) {          String remoteAddr = NettyUtil.parseChannelRemoteAddr(channel);          System.out.println("addChannel:" + remoteAddr);          if (!channel.isActive()) {              logger.error("channel is not active, address: {}", remoteAddr);          }          UserInfo userInfo = new UserInfo();          userInfo.setAddr(remoteAddr);          userInfo.setChannel(channel);          userInfo.setTime(System.currentTimeMillis());          userInfos.put(channel, userInfo);      }

登錄後,channel就變成有效的channel,無效的channel之後將會丟棄

public static boolean saveUser(Channel channel, String nick, String password) {          UserInfo userInfo = userInfos.get(channel);          if (userInfo == null) {              return false;          }          if (!channel.isActive()) {              logger.error("channel is not active, address: {}, nick: {}", userInfo.getAddr(), nick);              return false;          }          // 驗證用戶名和密碼          if (nick == null || password == null) {              return false;          }          LambdaQueryWrapper<Account> lambdaQueryWrapper = new LambdaQueryWrapper<>();          lambdaQueryWrapper.eq(Account::getUsername, nick).eq(Account::getPassword, password);          Account account = accountMapperStatic.selectOne(lambdaQueryWrapper);          if (account == null) {              return false;          }          // 增加一個認證用戶          userCount.incrementAndGet();          userInfo.setNick(nick);          userInfo.setAuth(true);          userInfo.setId(account.getId());          userInfo.setUsername(account.getUsername());          userInfo.setGroupNumber(account.getGroupNumber());          userInfo.setTime(System.currentTimeMillis());            // 註冊該用戶推送消息的通道          offlineInfoTransmitStatic.registerPull(channel);          return true;      }

當channel關閉時,就不再接收消息。unregisterPull就是註銷資訊消費者,客戶端不再接取聊天消息。此外,從下方有一個加寫鎖的操作,就是為了避免channel還在發送消息時,這邊突然關閉channel,這樣會導致報錯。

public static void removeChannel(Channel channel) {          try {              logger.warn("channel will be remove, address is :{}", NettyUtil.parseChannelRemoteAddr(channel));              //加上讀寫鎖保證移除channel時,避免channel關閉時,還有別的執行緒對其操作,造成錯誤              rwLock.writeLock().lock();              channel.close();              UserInfo userInfo = userInfos.get(channel);              if (userInfo != null) {                  if (userInfo.isAuth()) {                      offlineInfoTransmitStatic.unregisterPull(channel);                      // 減去一個認證用戶                      userCount.decrementAndGet();                  }                  userInfos.remove(channel);              }          } finally {              rwLock.writeLock().unlock();          }        }

為了無縫切換使用rabbitmq、rocketmq、activemq、不使用中間件存儲和轉發聊天消息這4種狀態,定義如下4個介面。依次是發送單聊消息、群聊消息、客戶端啟動接收消息、客戶端下線不接收消息。

public interface OfflineInfoTransmit {      void pushP2P(Integer userId, String message);        void pushGroup(String groupNumber, String message);        void registerPull(Channel channel);        void unregisterPull(Channel channel);  }

其中,如何使用rabbitmq、rocketmq、activemq三種中間件中的一種來存儲和轉發聊天消息,它的處理流程如下:

  1. 單聊的模型參考執行緒池的模型,如果用戶在線,直接通過channel發送給用戶。如果用戶離線,則發往中間件存儲,下次用戶上線時直接從中間件拉取消息。這樣做對比所有消息的發送都通過中間件來轉的好處是提升了性能
  2. 群聊則是完全通過中間件來轉發消息,消息發送中間件,客戶端從中間件接取消息。如果仍像單聊那樣操作,在線用戶直接通過channel發送,操作過於繁瑣,要判斷這個群組的哪些用戶是否在線
  3. 如果用戶在線就註冊消費者,從中間件接取消息。否則,就斷開消費者,消息保留在中間件中,以便客戶端下次上線時拉取。這樣就實現了離線消息的接收。
  4. 不管使用哪種中間件或使用不使用中間件,它的處理流程都遵循上面的3個要求,就能無縫切換上方的4種方法來存儲和轉發消息。需要哪種方法開啟相應註解即可。

file

項目地址:https://github.com/shuangyueliao/netty-chat