Dubbo系列之 (七)鏈路層那些事(1)

輔助鏈接

Dubbo系列之 (一)SPI擴展

Dubbo系列之 (二)Registry註冊中心-註冊(1)

Dubbo系列之 (三)Registry註冊中心-註冊(2)

Dubbo系列之 (四)服務訂閱(1)

Dubbo系列之 (五)服務訂閱(2)

Dubbo系列之 (六)服務訂閱(3)

Dubbo系列之 (七)鏈路層那些事(1)

在講解dubboTCP端的設計時,先了解下一些類的關係圖。它們是如何組織在一起的,每個功能又是什麼,接著在進一步深入了解其內涵。

類簡介

1、Exchangers(交換器工具類) 用來創建TCP服務(bind)和建立客戶端連接(connect)輔助類

2、Transporters(數據流傳輸工具類)用來創建TCP服務(bind)和建立客戶端連接(connect)輔助類,Exchangers的底層內容依賴於Transporters,並且Transporters會根據SPI擴展,來適配合適的tcp通訊框架,比如netty,mina等。

3、Exchanger(交換器) 用來創建TCP鏈接,通過工具類Exchangers完成,該介面是一個SPI擴展,目前唯一僅有就是HeaderExchanger。從名字的含義可以得到,該協議是具有自定義協議頭的交換器,所以取名HeaderExchanger。

4、Transporter(數據傳輸層) 用來創建TCP連接,通過工具類Transporters完成。它也是一個SPI擴展,比如NettyTransporter,MinaTransporter。

5、ExchangeClient (交換器客戶端),Exchanger的connect()方法返回,即建立了TCP連接後,返回的客戶端,接著就是通過該客戶端與服務端通訊,實例有HeaderExchangeClient、LazyConnectExchangeClient、ReferenceCountExchangeClient。之後分別講解這3個,Exchangers工具類建立的連接客戶端是HeaderExchangeClient。

6、ExchangeServer (交換器服務端端) Exchanger的bind()方法返回,即服務端監聽的服務端實例,它監聽這某個具體的tcp埠。默認實現是HeaderExchangeServer。

7、RemotingServer(遠程的TCP服務端),ExchangeServer類也實現了該介面,代表其也是一個遠程伺服器,具體的實現有NettyServer,由Transporter的bind()方法返回,具體的Transporter返回相應的遠程服務端。比如NettyTransporter#bind()返回NettyServer。

8、Client(TCP客戶端),ExchangeClient類也實現了該介面,代表其也是一個TCP客戶端,具體實現有NettyClient,由Transporter的connect()方法返回,具體的Transporter返回相應的TCP客戶端。比如NettyTransporter#connect()返回NettyClient。

9、Channel (通訊通道) ,每建立一個TCP鏈接就相應創建一個Channel。比如Netty建立連接後,就有一個Channel。這裡的Channel指的是dubbo自己定義的一個channel。它與netty的channel建立關聯,通過NettyChannel類,框架操作的是NettyChannel,而NettyChannel內部持有一個netty的channel對象。

10、HeaderExchangeChannel(交換器Channel,ExchangeChannel屬於交換器Channel),它被HeaderExchangeClient客戶端所持有,客戶端就是通過HeaderExchangeChannel進行通訊的,HeaderExchangeChannel內部持有一個具體的Channel。

11、ChannelHandler (通道處理器) 用來處理建立連接、發送請求、結束請求等操作的具體抽象。

12、ChannelHandlers(通道處理器工具類) 主要用來包裹封裝具體的Channel,它的作用是通過消息類型,根據Dispatcher返回不同的

13、Dispatcher(消息派發器)

類型 Dispatcher Channelhandler 作用
All AllDispatcher AllChannelHandler 所有的消息類型全部通過業務執行緒池處理
Connection ConnectionOrderedDispatcher ConnectionOrderedChannelHandler 連接、斷開消息單獨通過一個執行緒池池來處理,其他的讀寫等消息通過業務執行緒池處理
Direct DirectDispatcher DirectChannelHandler 所有的消息都通過IO執行緒池處理,不放到業務執行緒池中
Execution ExecutionDispatcher ExecutionChannelHandler 請求消息在業務執行緒池處理,其他消息在IO執行緒池。
Message MessageOnlyDispatcher MessageOnlyChannelHandler 請求和響應消息在業務執行緒池處理,其他心跳,連接等消息在IO執行緒池處理

類關係圖

試一把,Netty操作–客戶端多執行緒,單鏈路(TCP)

1、定義傳輸消息

@Data
@ToString
public class SampleMessage {

    private String threadName;
    
    private String id;

    private String desc;
}

2、編寫編碼器

public class SampleEncoder extends MessageToByteEncoder<SampleMessage> {

    protected void encode(ChannelHandlerContext channelHandlerContext, SampleMessage sampleMessage, ByteBuf byteBuf) throws Exception {

        String threadName = sampleMessage.getThreadName();
        String id = sampleMessage.getId();
        String desc = sampleMessage.getDesc();

        byteBuf.writeInt(threadName.getBytes().length);
        byteBuf.writeBytes(threadName.getBytes());

        byteBuf.writeInt(id.getBytes().length);
        byteBuf.writeBytes(id.getBytes());


        byteBuf.writeInt(desc.getBytes().length);
        byteBuf.writeBytes(desc.getBytes());
        String str = sampleMessage.getThreadName() + ":" + sampleMessage.getDesc() + ":" + sampleMessage.getId();

        System.out.println(str);
    }
}

3、編寫解碼器

public class SampleDecoder extends ByteToMessageDecoder {

    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {

        byteBuf.markReaderIndex();

        String threadName = read(byteBuf);
        if (threadName == null) {
            byteBuf.resetReaderIndex();
            return;
        }

        String id = read(byteBuf);
        if (id == null) {
            byteBuf.resetReaderIndex();
            return;
        }

        String desc = read(byteBuf);
        if (desc == null) {
            byteBuf.resetReaderIndex();
            return;
        }

        SampleMessage sampleMessage = new SampleMessage();
        sampleMessage.setId(id);
        sampleMessage.setThreadName(threadName);
        sampleMessage.setDesc(desc);
        list.add(sampleMessage);
    }

    private String read(ByteBuf byteBuf) {
        if (canReadInt(byteBuf)) {
            int readInt = byteBuf.readInt();
            if (canReadN(byteBuf, readInt)) {
                byte[] bytes = new byte[readInt];
                byteBuf.readBytes(bytes);
                return new String(bytes);
            } 
        }
        return null;
    }


    private boolean canReadInt(ByteBuf byteBuf) {
        return canReadN(byteBuf, 4);
    }

    private boolean canReadN(ByteBuf byteBuf, int n) {
        if (!byteBuf.isReadable()) {
            return false;
        }
        return byteBuf.readableBytes() >= n;
    }
}

4、編寫消息處理器

public class PrintChannelHandlers extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        if (msg instanceof SampleMessage) {
            SampleMessage sampleMessage = (SampleMessage) msg;
            System.out.println(sampleMessage.getThreadName() + ":" + sampleMessage.getId() + ":" + sampleMessage.getDesc());
        }
    }
    
}

5、編寫服務端

public class NettyServerMain {

    public static void main(String[] args) {
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(new NioEventLoopGroup(1), new NioEventLoopGroup(12))
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ch.pipeline()
                               // .addLast("log",new LoggingHandler(LogLevel.INFO))
                                .addLast("decoder", new SampleDecoder())
                                .addLast("encoder", new SampleEncoder())
                                .addLast("handler", new PrintChannelHandlers());
                    }
                });

        ChannelFuture channelFuture = serverBootstrap.bind(8888);
        channelFuture.syncUninterruptibly();
        System.out.println("鏈接前");
        Channel channel = channelFuture.channel();
        System.out.println("鏈接後");
    }
}

6、編寫客戶端

public class NettyClientMain {
    public static void main(String[] args) {
        NettyClientMain nettyClientMain = new NettyClientMain();
        nettyClientMain.open();
    }

    public void open() {
        Bootstrap bootstrap = new Bootstrap();
        bootstrap = new Bootstrap();
        bootstrap.group(new NioEventLoopGroup(10))
                .option(ChannelOption.SO_KEEPALIVE, true)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .channel(NioSocketChannel.class);

        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {

                ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                        .addLast("decoder", new SampleDecoder())
                        .addLast("encoder", new SampleEncoder());
                //.addLast("handler", new PrintChannelHandlers());

            }
        });

        SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 8888);

        ChannelFuture future = bootstrap.connect(socketAddress);
        boolean ret = future.awaitUninterruptibly(3000, MILLISECONDS);

        if (ret && future.isSuccess()) {
            Channel newChannel = future.channel();
            doProcess(newChannel);
        }
    }

    private void doProcess(Channel channel) {

        AtomicLong atomicLong = new AtomicLong();
        for (int i = 0; i < 15; i++) {
            final char ch = (char) (i + 65);
            final String id = "id" + i;
            Thread t = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        SampleMessage sampleMessage = new SampleMessage();
                        sampleMessage.setThreadName(Thread.currentThread().getName());
                        sampleMessage.setDesc(getdes(ch));
                        sampleMessage.setId("id" + sampleMessage.getDesc().length() + "-" + atomicLong.getAndIncrement());
                        channel.writeAndFlush(sampleMessage);
                    }
                }
            });
            t.start();
        }
    }


    private String getdes(char a) {
        Random random = new Random();
        StringBuffer buffer = new StringBuffer();
        for (int i = 0; i < random.nextInt(500) + 1; i++) {
            buffer.append(a);
        }
        return buffer.toString();
    }
}

7、測試結果

結果符合預期,dubbo 也是通過服務底層公用一條TCP鏈接,多執行緒進行調用該鏈路channel。

Tags: