dubbo系列十一、dubbo transport層記錄

前言

dubbo接口方法重載且入參未顯式指定序列化id導致ClassCastException分析時候用到了dubbo的通信層和編解碼,dubbo有個transport層,默認使用netty4進行網絡通信,寫的非常好,dubbo的netty4可以直接作為基礎模塊作為我們項目的通信框架。但是由於dubbo要兼容mina、graazz、netty5等網絡通信,因此自定定義了一套Channel、ChannelHandler來適配不同的通信框架,我們看到時候容易搞混亂,實際我們只需要netty4而已,因此記錄下dubbo provider和consumer的tcp建立監聽和連接,方便排除問題,也用於以後使用到netty4的時候,直接可以遷移過去。

dubbo服務端和客戶端建立連接記錄

此xmind是針對dubbo系列一、dubbo啟動流程的補充,增加了dubbo tcp的監聽和連接,如下圖 dubbo netty客戶端&服務端啟動流程,有了此圖,以後遺忘了,也很容易回顧起來。

dubbo netty客戶端&服務端啟動流程

xmind下載地址

dubbo服務端

功能:監聽tcp端口,創建NettyServer對象,此對象表示tcp 服務端,持有dubbo channelhandler chain、客戶端連接的dubbo channel、監聽端口綁定的netty channel。NettyServer也是個ChannelHandler。

服務端netty pipeline 【HeadContext InternalDecoder InternalEncoder NettyServerHandler TailContext】

接收客戶端請求,觸發pipeline的channelRead事件,執行順序 HeadContext->InternalDecoder->InternalEncoder->NettyServerHandler->TailContext,僅執行netty pipeline inboud事件

響應客戶端請求,觸發pipeline的write事件,執行順序 TailContext->NettyServerHandler->InternalEncoder->InternalDecoder->HeadContext,僅執行netty pipeline outboud事件

其中InternalDecoder/InternalEncoder分別用作dubbo協議解碼/編碼

重要的是NettyServerHandler,是個inboud&outbound,持有dubbo channelhandler chain(即NettyServer,而NettyServer又持有dubbo channelhandler chain),維護的客戶端連接channel集合,而dubbo channel(即NettyChannel)又持有netty channel,在讀取請求時候,經過解碼後,把netty channel封裝為dubbo channel(即NettyChannel),然後由dubbo channelhandler chain鏈式處理dubbo channel,這樣針對不同的通信進行了統一封裝,最後由dubbo channel chain調用最終的目標對象。那麼我們擴展的話,也只是擴展dubbo channel chain而已。

這裡的dubbo channelhandler chain是[NettyServer->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol$1],由NettyServerHandler觸發,每個channehandler職責不同,最終由 channelhandler DubboProtocol$1進行調用目標方法,執行業務邏輯。

dubbo客戶端

即netty client,連接服務端,和服務端保持長連接,對象是NettyClient,同時也是個ChannelHandler,持有netty channel,持有dubbo channelhandler chain【MultiMessageHandler->HeartbeatHandler->AllDispatcher->DecodeHandler->HeaderExchangeHandler->DubboProtocol$1】。

設計方式和nett server基本相同,也是通過netty pipeline觸發dubbo channel chain。

client 端netty pipeline 【HeadContext->InternalDecoder->InternalEncoder->NettyClientHandler->TailContext】

請求服務端,觸發pipeline的write事件,執行順序 TailContext->NettyClientHandler->InternalEncoder->InternalDecoder->HeadContext,僅執行netty pipeline outboud事件。

接收服務端響應,觸發pipeline的channelRead事件,執行順序 HeadContext->InternalDecoder->InternalEncoder->NettyClientHandler->TailContext,僅執行netty pipeline inboud事件。

重要的是NettyClientHandler,是個inboud&outbound,持有dubbo channelhandler,即NettyClient(持有dubbo channelhandler chain,netty channel),那麼也就持有dubbo channelhandler chain和netty channel,NettyClientHandler.handler即NettyClient->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol$1

維護的客戶端連接channel集合,而dubbo channel(即NettyChannel)又持有netty channel,在讀取請求時候,經過解碼後,把netty channel封裝為dubbo channel(即NettyChannel),然後由dubbo channelhandler chain鏈式處理dubbo channel,這樣針對不同的通信進行了統一封裝,最後由dubbo channel chain調用最終的目標對象。那麼我們擴展的話,也只是擴展dubbo channel chain而已。

這裡的dubbo channelhandler chain是[NettyClient->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol$1],由NettyClientHandler觸發,每個channehandler職責不同,最終由 channelhandler DubboProtocol$1進行調用目標方法,執行業務邏輯。

dubbo請求通信層完整流程

總結了下dubbo從客戶端到服務端的整個流程,包含IO線程和業務線程的切換,netty的執行,見下圖

image-20220120005303905

圖中的❶❷❸❹是IO線程和業務線程切換處,發生在netty pipeline的執行中,下面詳細說下

❶:客戶端發送數據,這裡觸發執行netty pipeline的outbound事件,執行順序TailContext->NettyClientHandler->InternalEncoder->InternalDecoder->HeadContext,其中在TailContext#writeAndFlush,封裝餘下pipeline及構造WriteAndFlushTask並添加到IO線程(netty work線程)的隊列taskQueue,然後業務線程發送執行完畢。同時由於IO線程對象NioEventLoop自旋,執行runAllTasks操作從隊列taskQueue取出WriteAndFlushTask任務執行,先執行write,再執行flush,因此要對應pipeline執行write & flush操作
即NettyClientHandler->InternalEncoder->InternalDecoder->HeadContext 先執行write操作,再執行flush操作

最後在HeadContext的write和flush操作內調用unsafe對象即NioSocketChannel$NioSocketChannelUnsafe(AbstractChannel.AbstractUnsafe).write(Object msg, ChannelPromise promise) & flush(),其中write是把數據寫到緩衝區,flush是發送數據到網卡

其中在執行NettyClientHandler的write操作時候,還會執行Dubbo channelHandler chain的sent操作,即NettyClient->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol$1的sent操作,鉤子擴展,這裡並沒有實際功能。

NettyClientHandler的個netty channelhandler,持有了dubbo channelhandler chain【NettyClient->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol$1】,即鉤子作用。

❷:服務端接收到客戶端數據,IO線程對象NioEventLoop自旋,執行processSelectedKeys操作,觸發netty pipeline的channelRead事件,執行順序HeadContext->InternalDecoder->InternalEncoder->NettyServerHandler->TailContext,先解碼,再通過NettyServerHandler把解碼結果提交給業務線程池處理,提交任務ChannelEventRunnable到業務線程池是由AllChannelHandler實現。

NettyServerHandler是netty channelhandler,持有了dubbo channelhandler chain【NettyServer->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler->HeaderExchangeHandler->DubboProtocol$1】,

❸:服務端響應客戶端,和❶基本相同,也是在TailContext#writeAndFlush進行線程切換,由業務線程切換到IO線程。這裡觸發的netty pipeline outbound事件,執行順序【TailContext->NettyServerHandler->InternalEncoder->InternalDecoder->HeadContext】,其中在NettyServerHandler內觸發dubbo channelhandler chain的sent操作,實際並無具體功能。

❹:客戶端接收服務端響應,和❷基本相同。netty pipeline執行inboud事件,執行順【HeadContext->InternalDecoder->InternalEncoder->NettyClientHandler->TailContext】,先解碼,再通過NettyClientHandler把解碼結果提交給業務線程池處理,提交任務ChannelEventRunnable到業務線程池是由AllChannelHandler實現。

注意:從上面看出,dubbo channelhandler chain是一部分在IO線程執行,一部分是被封裝到ChannelEventRunnable在業務線程執行。dubbo channelhandler是職責鏈,不同handler功能不同。

總結這個流程圖是為了更好的體會dubbo的整個請求流程,遇到問題參考此圖就很容易找到問題,還有就是理解dubbo對於netty的使用,從這個總結中也看出了netty通信開發的固定的套路,1.增加netty channelhandler進行編解碼,2.增加自定義netty channelhandler進行IO線程和業務線程的channelhandler就可以了。通常編解碼在IO線程執行,業務在業務線程池執行,通信上需要注意到粘包和拆包的處理。

dubbo動態代理類實際例子,方便遇到問題有實例參考分析

//com.alibaba.dubbo.common.bytecode.Wrapper2
package com.alibaba.dubbo.common.bytecode;

import com.alibaba.dubbo.common.bytecode.ClassGenerator;
import com.alibaba.dubbo.common.bytecode.NoSuchMethodException;
import com.alibaba.dubbo.common.bytecode.NoSuchPropertyException;
import com.alibaba.dubbo.common.bytecode.Wrapper;
import java.lang.reflect.InvocationTargetException;
import java.util.Map;
import org.pangu.api.ProductService;
import org.pangu.dto.ProductDTO;

public class Wrapper2 extends Wrapper implements ClassGenerator.DC {
	public static String[] pns;//PropertyNames
	public static Map pts;//Propertys
	public static String[] mns;//MethodNames
	public static String[] dmns;//DeclaredMethodNames
	public static Class[] mts0;
	public static Class[] mts1;
	public static Class[] mts2;

	@Override
	public String[] getPropertyNames() {
		return pns;
	}

	@Override
	public boolean hasProperty(String name) {
		return pts.containsKey(name);
	}

	public Class getPropertyType(String name) {
		return (Class) pts.get(name);
	}

	@Override
	public String[] getMethodNames() {
		return mns;
	}

	@Override
	public String[] getDeclaredMethodNames() {
		return dmns;
	}

	@Override
	public void setPropertyValue(Object object, String name, Object object2) {
		try {
			ProductService productService = (ProductService) object;
		} catch (Throwable throwable) {
			throw new IllegalArgumentException(throwable);
		}
		throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(name)
				.append("\" filed or setter method in class org.pangu.api.ProductService.").toString());
	}

	@Override
	public Object getPropertyValue(Object object, String name) {
		try {
			ProductService productService = (ProductService) object;
		} catch (Throwable throwable) {
			throw new IllegalArgumentException(throwable);
		}
		throw new NoSuchPropertyException(new StringBuffer().append("Not found property \"").append(name)
				.append("\" filed or setter method in class org.pangu.api.ProductService.").toString());
	}

	public Object invokeMethod(Object object, String name, Class[] classArray, Object[] objectArray)
			throws InvocationTargetException {
		ProductService productService;
		try {
			productService = (ProductService) object;
		} catch (Throwable throwable) {
			throw new IllegalArgumentException(throwable);
		}
		try {
			if ("findProduct".equals(name) && classArray.length == 1
					&& classArray[0].getName().equals("java.lang.String")) {
				return productService.findProduct((String) objectArray[0]);
			}
			if ("findProduct".equals(name) && classArray.length == 1
					&& classArray[0].getName().equals("org.pangu.dto.ProductDTO")) {
				return productService.findProduct((ProductDTO) objectArray[0]);
			}
			if ("selectProduct".equals(name) && classArray.length == 1) {
				return productService.selectProduct((ProductDTO) objectArray[0]);
			}
		} catch (Throwable throwable) {
			throw new InvocationTargetException(throwable);
		}
		throw new NoSuchMethodException(new StringBuffer().append("Not found method \"").append(name)
				.append("\" in class org.pangu.api.ProductService.").toString());
	}
}

dubbo通信重要接口ChannelHandler設計說明

從前面分析可以看出,dubbo也有一套Channel、ChannelHandler,netty也有Channel、ChannelHandler,而且和netty定義的同名。兩者區別是什麼呢?

netty Channel是對網絡通道的抽象,netty ChannelHandler是對Channel的處理器。

我的理解dubbo Channel也是對網絡通道的的抽象,持有netty Channel,兩者功能在抽象上是一樣的,但是dubbo為了兼容mina、Grizzly,因此dubbo抽象的Channel是針對這幾種通信框架的通道的抽象,做成一個統一模式,這些具體框架的通信變化並不會影響dubbo通信設計。

dubbo ChannelHandler是針對dubbo Channel的處理器,功能等同netty ChannelHandler。在dubbo中NettyClientHandler、NettyServerHandler分別是客戶端和服務端連接netty pipeline和dubbo ChannelHandler chain的橋樑。

dubbo ChannelHandler

這裡重要的是InternalEncoder/InternalDecoder、NettyServerHandler/NettyClientHandler

InternalEncoder/InternalDecoder:用於dubbo報文編解碼

NettyServerHandler/NettyClientHandler:連接netty和dubbo ChannelHandler的橋樑(分別作用在服務端和客戶端),這裡dubbo為了靈活擴展(比如支持mina等),增加了和netty同名的 ChannelHandler、Channel,因此看代碼的時候顯得有些混亂,容易搞蒙圈。

dubbo ChannelHandler接口定義處理5個事件,分別是連接、斷開、接收、發送、異常捕捉,是針對Channel的抽象。為什麼要這麼抽象呢?因為針對dubbo報文解碼後有許多靈活變化,比如心跳處理、派發(dispatcher)處理、處理Request/Response、線程池調用業務方法、處理NettyServer/NettyClient事件,這樣做是的目標是把netty ChannelHandler解耦,不需要實現大量的netty ChannelHandler來完成工作。只需要一個netty ChannelHandler(比如NettyServerHandler)就可以和dubbo ChannelHandler chain關聯起來,方便了dubbo的擴展。從我們上圖中分析,涉及到的一些dubbo ChannelHandler 作用如下:

dubbo ChannelHandler 作用
NettyServer 封裝netty服務端事件,處理連接、斷開、接收、發送、異常等事件
NettyClient 封裝netty客戶端事件,處理連接、斷開、接收、發送、異常等事件
MultiMessageHandler 支持流中多消息報文批處理
HeartbeatHandler 支持心跳處理
AllChannelHandler 支持dubbo線程池調用業務方法
DecodeHandler 支持在dubbo線程池中解碼
HeaderExchangeHandler 封裝處理Request/Response的調用能力
ExchangeHandlerAdapter 用於查找服務方法並調用

既然有了dubbo ChannelHandler,即Channel處理器,那麼自然也要有要處理的對象dubbo Channel

dubbo Channel

在 Dubbo 中會抽象出一個「端點(Endpoint)」的概念,我們可以通過一個 ip 和 port 唯一確定一個端點,兩個端點之間會創建 TCP 連接,可以雙向傳輸數據。Dubbo 將 Endpoint 之間的 TCP 連接抽象為通道(Channel),將發起請求的 Endpoint 抽象為客戶端(Client),將接收請求的 Endpoint 抽象為服務端(Server)。這些抽象出來的概念,也是整個 dubbo-remoting-api 模塊的基礎。
Channel 是對兩個 Endpoint 連接的抽象,好比連接兩個位置的傳送帶,兩個 Endpoint 傳輸的消息就好比傳送帶上的貨物,消息發送端會往 Channel 寫入消息,而接收端會從 Channel 讀取消息。

image-20220109234522114

dubbo Channel 我們就可以等同為netty 的Channel ,是個網絡通道。那既然有了netty Channel ,為什麼還要有個dubbo Channel 呢?由於我們使用dubbo都是使用的netty4,但是dubbo還支持mina、grizzly通信,他們這兩種有沒有Channel我就不知道了,這樣做是為了抽象,做成統一模式,這些通信變化,但是dubbo設計的通信並不會變化。

dubbo Channel 從接口定義來看,具有收發數據能力和附加 KV 屬性(即向通道增加一些屬性),比如NettyChannel、NettyClient就封裝了io.netty.channel.Channel

既然說到這裡就得總結dubbo remoting層的重要接口

dubbo remoting層的重要接口

com.alibaba.dubbo.remoting.Endpoint:在 Dubbo 中會抽象出一個「端點(Endpoint)」的概念,我們可以通過一個 ip 和 port 唯一確定一個端點,兩個端點之間會創建 TCP 連接,可以雙向傳輸數據。

com.alibaba.dubbo.remoting.Channel:Dubbo 將 Endpoint 之間的 TCP 連接抽象為通道(Channel),Channel有收發消息能力,可以認為等同netty Channel。

com.alibaba.dubbo.remoting.Client:將發起請求的 Endpoint 抽象為客戶端(Client)

com.alibaba.dubbo.remoting.Server:將接收請求的 Endpoint 抽象為服務端(Server)

Client、Server分別抽象了客戶端和服務端,兩者都繼承了 Channel、Resetable 等接口,也就是說兩者都具備了讀寫數據能力。Client 和 Server 本身都是 Endpoint,只不過在語義上區分了請求和響應的職責,兩者都具備發送的能力,所以都繼承了 Endpoint 接口。Client 和 Server 的主要區別是 Client 只能關聯一個 Channel,而 Server 可以接收多個 Client 發起的 Channel 連接。所以在 Server 接口中定義了查詢 Channel 的相關方法getChannels()/getChannel(InetSocketAddress remoteAddress)

com.alibaba.dubbo.remoting.ChannelHandler:ChannelHandler 是註冊在 Channel 上的消息處理器,在 Netty 中也有類似的抽象。有ChannelHandler chain分別承擔不同職責,功能靈活強大

com.alibaba.dubbo.remoting.Codec2:編解碼的定義,被 @SPI 接口修飾了,表示該接口是一個擴展接口,同時其 encode() 方法和 decode() 方法都被 @Adaptive 註解修飾,也就會生成適配器類,其中會根據 URL 中的 codec 值確定具體的擴展實現類。

com.alibaba.dubbo.remoting.Transporter:Dubbo 在 Client 和 Server 之上又封裝了一層Transporter 接口,Transporter 接口上有 @SPI 註解,它是一個擴展接口,默認使用「netty」這個擴展名,@Adaptive 註解的出現表示動態生成適配器類,會先後根據「server」「transporter」的值確定 RemotingServer 的擴展實現類,先後根據「client」「transporter」的值確定 Client 接口的擴展實現。具體有netty、mina、grizzly等不同通信實現。

Transporter 這一層抽象出來的接口,與 Netty 的核心接口是非常相似的。那為什麼要單獨抽象出 Transporter層,而不是像簡易版 RPC 框架那樣,直接讓上層使用 Netty 呢?

其實這個問題的答案也呼之欲出了,Netty、Mina、Grizzly 這個 NIO 庫對外接口和使用方式不一樣,如果在上層直接依賴了 Netty 或是 Grizzly,就依賴了具體的 NIO 庫實現,而不是依賴一個有傳輸能力的抽象,後續要切換實現的話,就需要修改依賴和接入的相關代碼,非常容易改出 Bug。這也不符合設計模式中的開放-封閉原則。

有了 Transporter 層之後,我們可以通過 Dubbo SPI 修改使用的具體 Transporter 擴展實現,從而切換到不同的 Client 和 RemotingServer 實現,達到底層 NIO 庫切換的目的,而且無須修改任何代碼。即使有更先進的 NIO 庫出現,我們也只需要開發相應的 dubbo-remoting-* 實現模塊提供 Transporter、Client、Server 等核心接口的實現,即可接入,完全符合開放-封閉原則。

com.alibaba.dubbo.remoting.Transporters:它不是一個接口,而是門面類,其中封裝了 Transporter 對象的創建(通過 Dubbo SPI)以及 ChannelHandler 的處理。在創建 Client 和 Server 的時候,可以指定多個 ChannelHandler 綁定到 Channel 來處理其中傳輸的數據。Transporters.connect() 方法和 bind() 方法中,會將多個 ChannelHandler 封裝成一個 ChannelHandlerDispatcher 對象。

ChannelHandlerDispatcher 也是 ChannelHandler 接口的實現類之一,維護了一個 CopyOnWriteArraySet 集合,它所有的 ChannelHandler 接口實現都會調用其中每個 ChannelHandler 元素的相應方法。另外,ChannelHandlerDispatcher 還提供了增刪該 ChannelHandler 集合的相關方法。

到此為止,Dubbo Transport 層的核心接口就介紹完了,這裡簡單總結一下:

簡單總結一下:

  • Endpoint 接口抽象了「端點」的概念,這是所有抽象接口的基礎。

  • 上層使用方會通過 Transporters 門面類獲取到 Transporter 的具體擴展實現,然後通過 Transporter 拿到相應的 Client 和 Server 實現,就可以建立(或接收)Channel 與遠端進行交互了。

  • 無論是 Client 還是 RemotingServer,都會使用 ChannelHandler 處理 Channel 中傳輸的數據,其中負責編解碼的 ChannelHandler 被抽象出為 Codec2 接口。

重要接口的說明參考,寫的很棒,直接拿來了