我想到的Netty功能,別人已經實現了

  • 2019 年 12 月 31 日
  • 筆記

Netty是年初最先學習的框架,近期的圖書租借系統想要用上聊天功能,實現一對一對話聊天,在用戶登錄服務端時,獲取用戶ip與id綁定,放入channelgroup,每次循環遍歷這個ip對應的channel,否則返回false,在用戶狀態取到是否在線,在線狀態屬於handler消息功能。

1.一對一聊天

2.展示在線人數

3.登錄驗證

4.輸入框優化

5.可支援表情

開源萬歲

當尋找Netty一對一聊天功能實現的時候,除了重溫下功能點,還能發現新的東西,一個小小的demo,別人卻不是這麼做的,細化到驗證,還考慮到性能,看下實現效果頁面

需要昵稱輸入,登錄後不是簡單的數據新增

 private static ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(true);   private static ConcurrentMap<Channel, UserInfo> userInfos = new ConcurrentHashMap<>();   private static AtomicInteger userCount = new AtomicInteger(0);

讀寫鎖;

並發包;

原子計數;

因為最終實現用戶的在線人數統計,用了原子類的AtomicInteger

 private static AtomicInteger uidGener = new AtomicInteger(1000);        private boolean isAuth = false; // 是否認證      private long time = 0;  // 登錄時間      private int userId;     // UID      private String nick;    // 昵稱      private String addr;    // 地址      private Channel channel;// 通道

登錄用戶資訊,確定昵稱,獲取ip,每一個用戶ip,channel一一對應放入channelGroup。

如何發送消息?

在消息處理Handler中,重寫channelread0方法

1.獲取是否存在此用戶資訊的channel

2.存在將用戶id,昵稱,接收到的消息廣播到頁面顯示

3.消息不為null,讀寫鎖加鎖,找到當前用戶的channel

4.遍歷用戶資訊,通過netty寫入回調返回

 if (msg instanceof FullHttpRequest) {              handleHttpRequest(ctx, (FullHttpRequest) msg);          } else if (msg instanceof WebSocketFrame) {              handleWebSocket(ctx, (WebSocketFrame) msg);          }

將Handler接受接入到Server

1.定時關閉失效的channel

2.定時向客戶端ping消息

Netty的消息處理流程

定義好父子執行緒組–>在childInitializer定義好相關處理通道處理器–>自己提供處理器在回調

現在開始回調

.childHandler(new ChannelInitializer<SocketChannel>() {                        @Override                      protected void initChannel(SocketChannel ch) throws Exception {                         將group放入消息管道                          ch.pipeline().addLast(defLoopGroup,                                  //請求解碼器                                  new HttpServerCodec(),                                  //將多個消息轉換成單一的消息對象                                  new HttpObjectAggregator(65536),                                  //支援非同步發送大的碼流,一般用於發送文件流                                  new ChunkedWriteHandler(),                                  //檢測鏈路是否讀空閑                                  new IdleStateHandler(60, 0, 0),                                  //處理握手和認證                                  new UserAuthHandler(),                                  //處理消息的發送                                  new MessageHandler()                          );                      }                  });

定時處理失效消息

 // 定時掃描所有的Channel,關閉失效的Channel              executorService.scheduleAtFixedRate(new Runnable() {                  @Override                  public void run() {                      logger.info("scanNotActiveChannel --------");                      UserInfoManager.scanNotActiveChannel();                  }              }, 3, 60, TimeUnit.SECONDS);                // 定時向所有客戶端發送Ping消息              executorService.scheduleAtFixedRate(new Runnable() {                  @Override                  public void run() {                      UserInfoManager.broadCastPing();                  }              }, 3, 50, TimeUnit.SECONDS);

前端頁面沒有什麼變化。

1.綁定websocket

2.websocket調用onmessage處理消息

window.socket = new WebSocket("ws://localhost:9688/websocket")

處理技術:

1.執行緒池

2.Lambad表達式

3.讀寫鎖提高性能

4.原子引用保證原子性,執行緒安全

debug

用戶登錄,判斷是否是給定的消息類型

獲取昵稱,用戶資訊,code碼

==success,保存用戶資訊

第一次登陸沒有此用戶id與channel,不廣播消息

用戶計數0 broadCastPing userCount: 0

當發送消息到頁面時

觸發定時消息廣播,遍歷size

處理消息ctx.fireChannelRead(frame.retain());

此時廣播用戶計數broadCastPing userCount: 1

websocket實現了真正意義上的客戶端與伺服器端的長連接,節省頻寬,而關注內容本身

websocket:1–:雙向數據傳遞;

2–:基於Http

3–:非瀏覽器場合