netty通訊
學習netty之前,要先了解作業系統中的IO、零拷貝(已經附上鏈接了)
一、netty的簡單介紹
- Netty 是由 JBOSS 提供的一個 Java 開源框架,現為 Github 上的獨立項目。
- Netty 是一個非同步的、基於事件驅動的網路應用框架,用以快速開發高性能、高可靠性的網路 IO 程式。
- Netty 主要針對在 TCP 協議下,面向 Client 端的高並發應用,或者 **Peer-to-Peer **場景下的大量數據持續傳輸的應用。
- Netty 本質是一個 NIO 框架,適用於伺服器通訊相關的多種應用場景。
- Dubbo 協議默認使用 Netty 作為基礎通訊組件,用於實現各進程節點之間的內部通訊
為什麼有了netty框架
原生的NIO存在問題:
- NIO的類庫和API繁雜:需要熟練掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer
- 要熟悉Java多執行緒編程,因為NIO編程涉及到Reactor模式
- Selector可能出現空輪詢,佔用CPU資源
netty是對NIO進行封裝,解決了上述問題:
- 設計優雅:
適用於各種傳輸類型的統一API,阻塞和非阻塞Socket;
基於靈活且可擴展的事件模型,可以清晰地分離關注點;
高度可訂製的執行緒模型
- 高性能、吞吐量更高:
延遲更低;
減少資源消耗;
最小化不必要的記憶體複製。
- 安全:完整的 SSL/TLS 和 StartTLS 支援。
- 社區活躍、不斷更新
二、執行緒模式
下面講解的是netty執行緒模式的由來
現有的執行緒模式:
- BIO的傳統阻塞IO服務模型
- NIO的Reactor模型
單 Reactor 單執行緒;
單 Reactor多執行緒;
主從 Reactor多執行緒
2.1 BIO的傳統阻塞IO服務模型
1、每個連接都需要獨立的執行緒完成數據的輸入,業務處理,數據返回。當並發數很大,就會創建大量的執行緒,佔用很大系統資源。
2、連接創建後,如果當前執行緒暫時沒有數據可讀,該執行緒會阻塞在 Handler對象中的read 操作,導致上面的處理執行緒資源浪費。
2.2 NIO的Reactor模型
基於I/O多路復用模型:多個客戶端進行連接,先把連接請求給Reactor,多個連接共用一個阻塞對象Reactor,由Reactor負責監聽和分發,當客戶端連接沒有數據時不會阻塞執行緒。
基於執行緒池復用執行緒資源:不必再為每個連接創建執行緒,將連接完成後的業務處理任務分配給執行緒進行處理,一個執行緒可以處理多個連接的業務。(解決了當並發數很大時,會創建大量執行緒,佔用很大系統資源)
Reactor模式中核心組成:
- Reactor:在一個單獨的執行緒中運行,負責監聽和分發事件,分發給適當的處理執行緒來對IO事件做出反應。
- Handlers:處理執行緒執行IO事件。
單Reactor單執行緒
特點:
- 該模型簡單沒有多執行緒的競爭,由一個執行緒完成所有的操作(監聽、分發、執行),沒有充分利用多核CPU
- 因為Reactor是單執行緒運行,因此在處理某個handler的IO事件時,其他的handler需要進行等待,等待時間長因為該執行緒還要處理業務;
- 當Reactor出現問題,就會造成業務模組不可用
上圖解析:
- Reactor對象通過select監控客戶端請求事件,收到事件後通過dispatch 進行分發
- 如果是建立連接請求事件,則由Acceptor通過accept處理連接請求,然後創建一個 Handler 對象處理連接完成後的後續業務處理
- 如果不是建立連接事件,則Reactor會分發處理執行緒來處理Handler的IO事件(完成 Read → 業務處理 → send 的完整業務流程)
單Reactor多執行緒
特點:
- 充分利用多核CPU,可能出現多執行緒競爭
- 因為Reactor是單執行緒運行,因此在處理某個handler的IO事件時,其他的handler需要進行等待,等待時間短因為處理業務交給worker執行緒池執行;
- Reactor承擔所有的事件的監聽和響應,它是單執行緒運行,在高並發場景容易出現性能瓶頸
- 當Reactor出現問題,就會造成業務模組不可用
上圖解析:
- Reactor對象通過select監控客戶端請求事件,收到事件後,通過Dispatch 進行分發
- 如果是建立連接請求事件,則由Acceptor通過accept處理連接請求,然後創建一個Handler對象處理完成連接後的各種事件
- 如果不是連接請求事件,則Reactor會分發處理執行緒來處理Handler的IO事件,handler只負責響應事件(read、send),不做具體的業務處理交給worker執行緒池執行(這樣不會使handler阻塞太久)
- worker執行緒池會分配獨立的worker執行緒完成真正的業務,並將結果返回給handler,handler收到響應後,通過send將結果返回給client
主從Reactor多執行緒
特點:
- 主Reactor可已有多個,從Reactor也可以有多個
- 主Reactor負責處理建立連接請求事件,從Reactor負責處理業務請求事件
- 當從Reactor出現問題,可以調用其他的從Reactor提供服務
上圖解析:
- Reactor主執行緒 MainReactor對象通過select監聽連接事件,收到事件後,通過Acceptor的accept處理連接事件
- 當Acceptor處理連接事件後,MainReactor將連接分配給SubReactor,subreactor將連接加入到連接隊列進行監聽,並創建handler進行各種事件處理
- 當有新事件發生時,subreactor就會分發處理執行緒來處理handler,handler通過read讀取數據,分發給後面的worker執行緒池處理。
- worker執行緒池分配獨立的worker執行緒進行業務處理,並返回結果。handler 收到響應的結果後,再通過send將結果返回給client
- Reactor主執行緒可以對應多個Reactor子執行緒,即MainRecator可以關聯多個 SubReactor
生活中的體現:
單 Reactor 單執行緒:前台接待員和服務員是同一個人,全程為顧客服務
單 Reactor 多執行緒:1 個前台接待員,多個服務員,接待員只負責接待
主從 Reactor 多執行緒:多個前台接待員,多個服務生
2.3 netty執行緒模型
netty執行緒模型主要是依據主從Reactor多執行緒
-
BossGroup中的NioEventLoop就像MainReactor可以有多個,WorkerGroup中的NioEventLoop就像SubReactor一樣可以有多個。
-
Netty抽象出兩組執行緒池,BossGroup專門負責接收客戶端的連接,WorkerGroup專門負責網路的讀寫
-
BossGroup和WorkerGroup類型都是NioEventLoopGroup,NioEventLoopGroup相當於一個事件循環組,這個組中含有多個事件循環,每一個事件循環是NioEventLoop(NioEventLoopGroup可以有多個執行緒,即可以含有多個NioEventLoop)
-
NioEventLoop表示一個不斷循環的執行處理任務的執行緒,每個 NioEventLoop都有一個Selector,用於監聽綁定在其上的socket的網路通訊
-
每個BossGroup下面的NioEventLoop循環執行的步驟有3步:
1、輪詢 accept 事件
2、處理 accept 事件,與 client 建立連接,生成NioSocketChannel,並將其註冊到某個WorkerGroup的NioEventLoop上的Selector
3、繼續處理任務隊列的任務,即runAllTasks -
每個WorkerGroup下面的NioEventLoop循環執行的步驟有3步:
1、輪詢 read,write 事件
2、處理 I/O 事件,即 read,write 事件,在對應NioSocketChannel處理
3、處理任務隊列的任務,即runAllTasks -
每個WorkerGroup的NioEventLoop處理業務時,會使用pipeline(管道),pipeline中包含了channel(通道),即通過pipeline可以獲取到對應通道,每個通道中都有一個channelPipeline維護了很多的處理器channelhandler。
netty案例-TCP服務
服務端:
NettyServer
NettyServer程式碼
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyServer {
public static void main(String[] args) throws Exception {
//創建BossGroup 和 WorkerGroup
//說明
//1. 創建兩個執行緒組 bossGroup 和 workerGroup
//2. bossGroup 只是處理連接請求 , 真正的和客戶端業務處理,會交給 workerGroup完成
//3. 兩個都是無限循環
//4. bossGroup 和 workerGroup 含有的子執行緒(NioEventLoop)的個數
// 默認實際 cpu核數 * 2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8
try {
//創建伺服器端的啟動對象,配置參數
ServerBootstrap bootstrap = new ServerBootstrap();
//使用鏈式編程來進行設置
bootstrap.group(bossGroup, workerGroup) //設置兩個執行緒組
.channel(NioServerSocketChannel.class) //使用NioSocketChannel 作為伺服器的通道實現
.option(ChannelOption.SO_BACKLOG, 128) // 設置執行緒隊列等待連接個數
.childOption(ChannelOption.SO_KEEPALIVE, true) //設置保持活動連接狀態
// .handler(null) // 該 handler對應 bossGroup , childHandler 對應 workerGroup
.childHandler(new ChannelInitializer<SocketChannel>() {//創建一個通道初始化對象(匿名對象)
//給pipeline 設置處理器
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("客戶socketchannel hashcode=" + ch.hashCode()); //可以使用一個集合管理 SocketChannel, 再推送消息時,可以將業務加入到各個channel 對應的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
ch.pipeline().addLast(new NettyServerHandler());
}
}); // 給我們的workerGroup 的 EventLoop 對應的管道設置處理器
System.out.println(".....伺服器 is ready...");
//綁定一個埠並且同步生成了一個 ChannelFuture 對象(也就是立馬返回這樣一個對象)
//啟動伺服器(並綁定埠)
ChannelFuture cf = bootstrap.bind(6668).sync();
//給cf 註冊監聽器,監控我們關心的事件
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (cf.isSuccess()) {
System.out.println("監聽埠 6668 成功");
} else {
System.out.println("監聽埠 6668 失敗");
}
}
});
//對關閉通道事件 進行監聽
cf.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
NettyServerHandler
NettyServerHandler程式碼
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPipeline;
import io.netty.util.CharsetUtil;
import java.util.concurrent.TimeUnit;
/*
說明
1. 我們自定義一個Handler 需要繼承netty 規定好的某個HandlerAdapter(規範)
2. 這時我們自定義一個Handler , 才能稱為一個handler
*/
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
//讀取數據事件(這裡我們可以讀取客戶端發送的消息)
/*
1. ChannelHandlerContext ctx:上下文對象, 含有 管道pipeline , 通道channel, 地址
2. Object msg: 就是客戶端發送的數據 默認Object
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("伺服器讀取執行緒 " + Thread.currentThread().getName() + " channle =" + ctx.channel());
System.out.println("server ctx =" + ctx);
System.out.println("看看channel 和 pipeline的關係");
Channel channel = ctx.channel();
ChannelPipeline pipeline = ctx.pipeline(); //本質是一個雙向鏈表
//將 msg 轉成一個 ByteBuf
//ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
ByteBuf buf = (ByteBuf) msg;
System.out.println("客戶端發送消息是:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("客戶端地址:" + channel.remoteAddress());
}
//數據讀取完畢
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//writeAndFlush 是 write + flush
//將數據寫入到快取,並刷新
//一般講,我們對這個發送的數據進行編碼
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客戶端~(>^ω^<)喵1", CharsetUtil.UTF_8));
}
//發生異常後, 一般是需要關閉通道
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客戶端:
NettyClient
NettyClient程式碼
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
public class NettyClient {
public static void main(String[] args) throws Exception {
//客戶端需要一個事件循環組
EventLoopGroup group = new NioEventLoopGroup();
try {
//創建客戶端啟動對象
//注意客戶端使用的不是 ServerBootstrap 而是 Bootstrap
Bootstrap bootstrap = new Bootstrap();
//設置相關參數
bootstrap.group(group) //設置執行緒組
.channel(NioSocketChannel.class) // 設置客戶端通道的實現類(反射)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyClientHandler()); //加入自己的處理器
}
});
System.out.println("客戶端 ok..");
//啟動客戶端去連接伺服器端
//關於 ChannelFuture 要分析,涉及到netty的非同步模型
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
//對關閉通道事件 進行監聽
channelFuture.channel().closeFuture().sync();
}finally {
group.shutdownGracefully();
}
}
}
NettyClientHandler
NettyClientHandler程式碼
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
//當通道就緒就會觸發該方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client " + ctx);
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
}
//當通道有讀取事件時,會觸發
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("伺服器回復的消息:" + buf.toString(CharsetUtil.UTF_8));
System.out.println("伺服器的地址: "+ ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
netty中的參數組件
1、Bootstrap、ServerBootstrap
Bootstrap 意思是引導,一個 Netty 應用通常由一個 Bootstrap 開始,主要作用是配置整個 Netty 程式,串聯各個組件,Netty 中 Bootstrap 類是客戶端程式的啟動引導類,ServerBootstrap 是服務端啟動引導類。
2、Future、ChannelFuture
Netty 中所有的 IO 操作都是非同步的,不能立刻得知消息是否被正確處理。但是可以立即返回一個ChannelFuture,它可以註冊一個監聽,當操作執行成功或失敗時監聽會自動觸發註冊的監聽事件
- Channel channel(),返回當前正在進行 IO 操作的通道
- ChannelFuture sync(),等待非同步操作執行完畢,同步執行註冊的監聽事件
Future-Listener 機制
當Future對象剛剛創建時,處於非完成狀態,調用者可以通過返回的 ChannelFuture來獲取操作執行的狀態,註冊監聽函數來執行完成後的操作。
- 通過 isDone 方法來判斷當前操作是否完成;
- 通過 isSuccess 方法來判斷已完成的當前操作是否成功;
- 通過 getCause 方法來獲取已完成的當前操作失敗的原因;
- 通過 isCancelled 方法來判斷已完成的當前操作是否被取消;
- 通過 addListener 方法來註冊監聽器,當操作已完成(isDone方法返回完成),將會通知指定的監聽器;
3、Channel
- Netty 網路通訊的組件,能夠用於執行網路 I/O 操作。
- Netty 網路通訊的組件,能夠用於執行網路 I/O 操作。
- Channel 提供非同步的網路 I/O 操作(如建立連接,讀寫,綁定埠),非同步調用意味著任何 I/O 調用都將立即返回一個ChannelFuture,並且不保證在調用結束時所請求的 I/O 操作已完(後期通過ChannelFuture的方法查看非同步執行結果)
- 不同協議、不同的阻塞類型的連接都有不同的 Channel 類型與之對應
NioSocketChannel,非同步的客戶端 TCP Socket 連接。
NioServerSocketChannel,非同步的伺服器端 TCP Socket 連接。
NioDatagramChannel,非同步的 UDP 連接。
4、Selector
- Netty 基於 Selector 對象實現 I/O 多路復用,通過 Selector 一個執行緒可以監聽多個連接的 Channel 事件。
- 當向一個 Selector 中註冊 Channel 後,Selector 內部的機制就可以自動不斷地查詢(Select)這些註冊的 Channel 是否有已就緒的 I/O 事件(例如可讀,可寫,網路連接完成等),這樣程式就可以很簡單地使用一個執行緒高效地管理多個 Channel
5、ChannelHandler 及其實現類
- ChannelHandler 是一個介面,處理 I/O 事件或攔截 I/O 操作,並將其轉發到其 ChannelPipeline(業務處理鏈)中的下一個處理程式。
- ChannelHandler 本身並沒有提供很多方法,因為這個介面有許多的方法需要實現,方便使用期間,可以繼承它的子類
- 當自定義一個handler類處理器時,需要繼承ChannelhandlerAdapter
6、Pipeline 和 ChannelPipeline
- Pipeline中包含了多個Channel(通道)
- 一個Channel包含了一個ChannelPipeline,而ChannePipeline中又維護了一個由ChannelHandlerContext組成的雙向鏈表,並且每個channeHandlerContext中又關聯著一個channelHandler
- ChannelPipeline是保存ChannelHandler的List,用於處理或攔截Channel 的入站事件和出站事件操作
- 入站事件和出站事件在一個雙向鏈表中,入站事件會從鏈表head往後傳遞到最後一個入站的 handler,出站事件會從鏈表tail往前傳遞到最前t個出站的handler, c兩種類型的handler互不干擾
- ChannelPipeline實現了一種高級形式的攔截過濾器模式,使用戶可以完全控制事件的處理方式
ChannelPipeline addFirst(ChannelHandler... handlers),把一個業務處理類(handler)添加到鏈中的第一個位置
ChannelPipeline addLast(ChannelHandler... handlers),把一個業務處理類(handler)添加到鏈中的最後一個位置
7、ChannelHandlerContext
- 保存 Channel 相關的所有上下文資訊,同時關聯一個 ChannelHandler 對象
- 即ChannelHandlerContext中包含一個具體的事件處理器ChannelHandler,同時ChannelHandlerContext中也綁定了對應的pipeline 和 Channel 的資訊,方便對ChannelHandler進行調用。
ChannelHandlerContext的常用方法:
1、ChannelFuture close(),關閉通道
2、ChannelOutboundInvoker flush(),刷新
3、ChannelFuture writeAndFlush(Object msg),將數據寫到ChannelPipeline 中當前 ChannelHandler 的下一個ChannelHandler 開始處理(出站)
8、ChannelOption
Netty在創建Channel實例後,一般都需要設置 ChannelOption 參數
9、EventLoopGroup 和其實現類 NioEventLoopGroup
- EventLoopGroup 是一組 EventLoop 的抽象,Netty 為了更好的利用多核 CPU 資源,一般會有多個 EventLoop 同時工作,每個 EventLoop 維護著一個 Selector 實例。
- EventLoopGroup 提供 next 介面,可以從組裡面按照一定規則獲取其中一個 EventLoop 來處理任務。在 Netty 伺服器端編程中,我們一般都需要提供兩個 EventLoopGroup,例如:BossEventLoopGroup 和 WorkerEventLoopGroup。
- BossEventLoop負責接收客戶端的連接並將SocketChannel註冊到 WorkerEventLoopGroup的其中一個workerEventLoop的selector上並進行後續的IO事件處理
10、Unpooled類
- Netty提供一個專門用來操作緩衝區(即 Netty 的數據容器)的工具類
ByteBuf buffer = Unpooled.buffer(10);
ByteBuf byteBuf = Unpooled.copiedBuffer(“hello,world!”, Charset.forName(“utf-8”));
netty的案例-群聊系統
GroupChatServer
GroupChatServer程式碼
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class GroupChatServer {
private int port; //監聽埠
public GroupChatServer(int port) {
this.port = port;
}
//編寫run方法,處理客戶端的請求
public void run() throws Exception{
//創建兩個執行緒組
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(); //8個NioEventLoop
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//獲取到pipeline
ChannelPipeline pipeline = ch.pipeline();
//向pipeline加入解碼器
pipeline.addLast("decoder", new StringDecoder());
//向pipeline加入編碼器
pipeline.addLast("encoder", new StringEncoder());
//加入自己的業務處理handler
pipeline.addLast(new GroupChatServerHandler());
}
});
System.out.println("netty 伺服器啟動");
ChannelFuture channelFuture = b.bind(port).sync();
//監聽關閉
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatServer(7000).run();
}
}
GroupChatServerHandler
GroupChatServerHandler程式碼
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {
//這樣寫還要自己遍歷Channel
//public static List<Channel> channels = new ArrayList<Channel>();
//使用一個hashmap 管理私聊(私聊本案例並未實現,只是提供個思路)
//public static Map<String, Channel> channels = new HashMap<String,Channel>();
//定義一個channle 組,管理所有的channel
//GlobalEventExecutor.INSTANCE) 是全局的事件執行器,是一個單例
private static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
//handlerAdded 表示連接建立,一旦連接,第一個被執行
//將當前channel 加入到 channelGroup
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
//將該客戶加入聊天的資訊推送給其它在線的客戶端
//該方法會將 channelGroup 中所有的channel 遍歷,並發送消息,我們不需要自己遍歷
channelGroup.writeAndFlush("[客戶端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " \n");
channelGroup.add(channel);
//私聊如何實現
// channels.put("userid100",channel);
}
//斷開連接, 將xx客戶離開資訊推送給當前在線的客戶
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
channelGroup.writeAndFlush("[客戶端]" + channel.remoteAddress() + " 離開了\n");
System.out.println("channelGroup size" + channelGroup.size());
}
//表示channel 處於活動狀態, 提示 xx上線
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//這個是給服務端看的,客戶端上面已經提示xxx加入群聊了
System.out.println(ctx.channel().remoteAddress() + " 上線了~");
}
//表示channel 處於不活動狀態, 提示 xx離線了
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println(ctx.channel().remoteAddress() + " 離線了~");
}
//讀取數據,轉發給在線的每一個客戶端
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
//獲取到當前channel
Channel channel = ctx.channel();
//這時我們遍歷channelGroup, 根據不同的情況,回送不同的消息
channelGroup.forEach(ch -> {
if(channel != ch) { //不是當前的channel,轉發消息
ch.writeAndFlush("[客戶]" + channel.remoteAddress() + " 發送了消息" + msg + "\n");
}else {//回顯自己發送的消息給自己
ch.writeAndFlush("[自己]發送了消息" + msg + "\n");
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//關閉通道
ctx.close();
}
}
GroupChatClient
GroupChatClient程式碼
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.util.Scanner;
public class GroupChatClient {
//屬性
private final String host;
private final int port;
public GroupChatClient(String host, int port) {
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//得到pipeline
ChannelPipeline pipeline = ch.pipeline();
//加入相關handler
pipeline.addLast("decoder", new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
//加入自定義的handler
pipeline.addLast(new GroupChatClientHandler());
}
});
ChannelFuture channelFuture = bootstrap.connect(host, port).sync();
//得到channel
Channel channel = channelFuture.channel();
System.out.println("-------" + channel.localAddress()+ "--------");
//客戶端需要輸入資訊,創建一個掃描器
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLine()) {
String msg = scanner.nextLine();
//通過channel 發送到伺服器端
channel.writeAndFlush(msg + "\r\n");
}
}finally {
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new GroupChatClient("127.0.0.1", 7000).run();
}
}
GroupChatClientHandler
GroupChatClientHandler程式碼
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
public class GroupChatClientHandler extends SimpleChannelInboundHandler<String> {
//從伺服器拿到的數據
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(msg.trim());
}
}
netty的心跳檢測機制
IdleStateHandler是netty 提供的處理空閑狀態的處理器(放在ChannelPipeline維護的ChannelHandler的雙向鏈表上):
- long readerIdleTime : 表示多長時間沒有讀, 就會發送一個心跳檢測包檢測是否連接
- long writerIdleTime : 表示多長時間沒有寫, 就會發送一個心跳檢測包檢測是否連接
- long allIdleTime : 表示多長時間沒有讀寫, 就會發送一個心跳檢測包檢測是否連接
netty的編解碼器
編解碼器放在ChannelPipeline維護的ChannelHandler的雙向鏈表上
服務端發數據給客戶端:服務端—>出站(編碼)—>Socket通道—>入站(解碼)—>客戶端
客戶端發數據給服務端:客戶端—>出站(編碼)—>Socket通道—>入站(解碼)—>服務端
編解碼器(自定義編解碼器時需要繼承下面的其中一個類):
- ByteToMessageDecoder
- LineBasedFrameDecoder:這個類在 Netty 內部也有使用,它使用行尾控制字元(\n或者\r\n)作為分隔符來解析數據。
- DelimiterBasedFrameDecoder:使用自定義的特殊字元作為消息的分隔符。
- HttpObjectDecoder:一個 HTTP 數據的解碼器
- LengthFieldBasedFrameDecoder:通過指定長度來標識整包消息,這樣就可以自動的處理黏包和半包消息。
TCP 粘包和拆包及解決方案
TCP粘包和拆包
使用優化方法(Nagle 演算法),將多次間隔較小且數據量小的數據,合併成一個大的數據塊,然後進行封包。這樣做雖然提高了效率,但是接收端就難於分辨出完整的數據包了,出現了粘包和拆包的問題,因為面向流的通訊是無消息保護邊界的。
- 服務端分兩次讀取到了兩個獨立的數據包,分別是 D1 和 D2,沒有粘包和拆包
- 服務端一次接受到了兩個數據包,D1 和 D2 粘合在一起,稱之為 TCP 粘包
- 服務端分兩次讀取到了數據包,第一次讀取到了完整的 D1 包和 D2 包的部分內容,第二次讀取到了 D2 包的剩餘內容,這稱之為 TCP 拆包
- 服務端分兩次讀取到了數據包,第一次讀取到了 D1 包的部分內容 D1_1,第二次讀取到了 D1 包的剩餘部分內容 D1_2 和完整的 D2 包。
解決方案
使用自定義協議+編解碼器來解決,關鍵就是要解決伺服器端每次讀取數據長度的問題,這個問題解決,就不會出現伺服器多讀或少讀數據的問題,從而避免的 TCP 粘包、拆包。
RPC(基於netty)
1、RPC(Remote Procedure Call)遠程過程調用,是一個電腦通訊協議。該協議允許運行於一台電腦的程式調用另一台電腦的子程式,而程式設計師無需額外地為這個交互作用編程
2、兩個或多個應用程式都分布在不同的伺服器上,它們之間的調用都像是本地方法調用一樣
RPC的調用流程圖
- 服務消費方(client)以本地調用方式調用服務
- client stub 接收到調用後負責將方法、參數等封裝成能夠進行網路傳輸的消息體
- client stub 將消息進行編碼並發送到服務端
- server stub 收到消息後進行解碼
- server stub 根據解碼結果調用本地的服務
- 本地服務執行並將結果返回給 server stub
- server stub 將返回導入結果進行編碼並發送至消費方
- client stub 接收到消息並進行解碼
- 服務消費方(client)得到結果
RPC 的目標就是將 2 - 8 這些步驟都封裝起來,用戶無需關心這些細節,可以像調用本地方法一樣即可完成遠程服務調用