Java進階專題(二十六) 將近2萬字的Dubbo原理解析,徹底搞懂dubbo
前言
前面我們研究了RPC的原理,市面上有很多基於RPC思想實現的框架,比如有Dubbo。今天就從Dubbo的SPI機制、服務註冊與發現源碼及網路通訊過程去深入剖析下Dubbo。
Dubbo架構
概述
Dubbo是阿里巴巴公司開源的一個高性能優秀的服務框架,使得應用可通過高性能的RPC 實現服務的輸出和輸入功能,可以和Spring框架無縫集成。
Dubbo是一款高性能、輕量級的開源Java RPC框架,它提供了三大核心能力:面向介面的遠程方法調用,智慧容錯和負載均衡,以及服務自動註冊和發現。
調用流程:
- 服務容器負責啟動,載入,運行服務提供者。
- 服務提供者在啟動時,向註冊中心註冊自己提供的服務。
- 服務消費者在啟動時,向註冊中心訂閱自己所需的服務。
- 註冊中心返回服務提供者地址列表給消費者,如果有變更,註冊中心將基於長連接推送變更數據給消費者。
- 服務消費者,從提供者地址列表中,基於軟負載均衡演算法,選一台提供者進行調用,如果調用失敗,再選另一台調用。
- 服務消費者和提供者,在記憶體中累計調用次數和調用時間,定時每分鐘發送一次統計數據到監控中心。
架構體系
源碼結構
- dubbo-common:公共邏輯模組: 包括Util類和通用模型
- dubbo-remoting 遠程通訊模組: 相當於dubbo協議的實現,如果RPC使用RMI協議則不需要使用此包
- dubbo-rpc 遠程調用模組: 抽象各種協議,以及動態代理,包含一對一的調用,不關心集群的原理。
- dubbo-cluster 集群模組: 將多個服務提供方偽裝成一個提供方,包括負載均衡,容錯,路由等,集群的地址列表可以是靜態配置的,也可以是註冊中心下發的.
- dubbo-registry 註冊中心模組: 基於註冊中心下發的集群方式,以及對各種註冊中心的抽象
- dubbo-monitor 監控模組: 統計服務調用次數,調用時間,調用鏈跟蹤的服務.
- dubbo-config 配置模組: 是dubbo對外的api,用戶通過config使用dubbo,隱藏dubbo所有細節
- dubbo-container 容器模組: 是一個standlone的容器,以簡單的main載入spring啟動,因為服務通常不需要Tomcat/Jboss等web容器的特性,沒必要用web容器去載入服務.
整體設計
- 圖中左邊淡藍背景的為服務消費方使用的介面,右邊淡綠色背景的為服務提供方使用的介面,位於中軸線上的為雙方都用到的介面。
- 圖中從下至上分為十層,各層均為單向依賴,每一層都可以剝離上層被複用,其中,Service 和Config 層為API,其它各層均為SPI。
- 圖中綠色小塊的為擴展介面,藍色小塊為實現類,圖中只顯示用於關聯各層的實現類。
- 圖中藍色虛線為初始化過程,即啟動時組裝鏈,紅色實線為方法調用過程,即運行時調時鏈,紫色三角箭頭為繼承,可以把子類看作父類的同一個節點,線上的文字為調用的方法。
各層說明
- config 配置層:對外配置介面,以 ServiceConfig , ReferenceConfig 為中心,可以直接初始化配置類,也可以通過spring 解析配置生成配置類
- proxy 服務代理層:服務介面透明代理,生成服務的客戶端Stub 和伺服器端Skeleton, 以ServiceProxy 為中心,擴展介面為 ProxyFactory
- registry 註冊中心層:封裝服務地址的註冊與發現,以服務URL 為中心,擴展介面為RegistryFactory , Registry , RegistryService
- cluster 路由層:封裝多個提供者的路由及負載均衡,並橋接註冊中心,以 Invoker 為中心,擴展介面為 Cluster , Directory , Router , LoadBalance
- monitor 監控層:RPC 調用次數和調用時間監控,以 Statistics 為中心,擴展介面為MonitorFactory , Monitor , MonitorService
- protocol 遠程調用層:封裝RPC 調用,以 Invocation , Result 為中心,擴展介面為Protocol , Invoker , Exporter
- exchange 資訊交換層:封裝請求響應模式,同步轉非同步,以 Request , Response 為中心,擴展介面為 Exchanger , ExchangeChannel , ExchangeClient , ExchangeServer
- transport 網路傳輸層:抽象mina 和netty 為統一介面,以 Message 為中心,擴展介面為Channel , Transporter , Client , Server , Codec
- serialize 數據序列化層:可復用的一些工具,擴展介面為 Serialization , ObjectInput ,ObjectOutput , ThreadPool
調用流程
對照上面的整體架構圖可以大致分為以下步驟:
1、服務提供者啟動,開啟Netty服務,創建Zookeeper客戶端,向註冊中心註冊服務。
2、服務消費者啟動,通過Zookeeper向註冊中心獲取服務提供者列表,與服務提供者通過Netty建立長連接。
3、服務消費者通過介面開始遠程調用服務,ProxyFactory通過初始化Proxy對象,Proxy通過創建動態代理對象。
4、動態代理對象通過invoke方法,層層包裝生成一個Invoker對象,該對象包含了代理對象。
5、Invoker通過路由,負載均衡選擇了一個最合適的服務提供者,在通過加入各種過濾器,協議層包裝生成一個新的DubboInvoker對象。
6、再通過交換成將DubboInvoker對象包裝成一個Reuqest對象,該對象通過序列化通過NettyClient傳輸到服務提供者的NettyServer端。
7、到了服務提供者這邊,再通過反序列化、協議解密等操作生成一個DubboExporter對象,再層層傳遞處理,會生成一個服務提供端的Invoker對象.
8、這個Invoker對象會調用本地服務,獲得結果再通過層層回調返回到服務消費者,服務消費者拿到結果後,再解析獲得最終結果。
Dubbo中的SPI機制
什麼是SPI
概述
在Dubbo 中,SPI 是一個非常重要的模組。基於SPI,我們可以很容易的對Dubbo 進行拓展。如果大家想要學習Dubbo 的源碼,SPI 機制務必弄懂。接下來,我們先來了解一下Java SPI 與Dubbo SPI 的用法,然後再來分析Dubbo SPI 的源碼。
SPI是Service Provider Interface 服務提供介面縮寫,是一種服務發現機制。SPI的本質是將介面的實現類的全限定名定義在配置文件中,並有伺服器讀取配置文件,並載入實現類。這樣就可以在運行的時候,動態為介面替換實現類。
JDK中的SPI
Java SPI 實際上是「基於介面的編程+策略模式+配置文件」組合實現的動態載入機制。
通過一個案例我們來認識下SPI
定義一個介面:
package com.laowang;
/**
* @author 原
* @date 2021/3/27
* @since 1.0
**/
public interface User {
String showName();
}
定義兩個實現類
package com.laowang.impl;
import com.laowang.User;
/**
* @author 原
* @date 2021/3/27
* @since 1.0
**/
public class Student implements User {
@Override
public String showName() {
System.out.println("my name is laowang");
return null;
}
}
package com.laowang.impl;
import com.laowang.User;
/**
* @author 原
* @date 2021/3/27
* @since 1.0
**/
public class Teacher implements User {
@Override
public String showName() {
System.out.println("my name is zhangsan");
return null;
}
}
在resources目錄下創建文件夾META-INF.services,並在該文件夾下創建一個名稱與User的全路徑一致的文件com.laowang.User
在文件中寫入,兩個實現類的全路徑名
編寫測試類:
package com.laowang;
import java.util.ServiceLoader;
/**
* @author 原
* @date 2021/3/27
* @since 1.0
**/
public class SpiTest {
public static void main(String[] args) {
ServiceLoader<User> serviceLoader = ServiceLoader.load(User.class);
serviceLoader.forEach(User::showName);
}
}
運行結果:
我們發現通過SPI機制,幫我們自動運行了兩個實現類。
通過查看ServiceLoader源碼:
其實通過讀取配置文件中實現類的全路徑類名,通過反射創建對象,並放入providers容器中。
總結:
調用過程
應用程式調用ServiceLoader.load方法,創建一個新的ServiceLoader,並實例化該類中的成員變數
應用程式通過迭代器介面獲取對象實例,ServiceLoader先判斷成員變數providers對象中(LinkedHashMap<String,S>類型)是否有快取實例對象,如果有快取,直接返回。如果沒有快取,執行類的裝載,
優點
使用Java SPI 機制的優勢是實現解耦,使得介面的定義與具體業務實現分離,而不是耦合在一起。應用進程可以根據實際業務情況啟用或替換具體組件。
缺點
不能按需載入。雖然ServiceLoader 做了延遲載入,但是基本只能通過遍歷全部獲取,也就是介面的實現類得全部載入並實例化一遍。如果你並不想用某些實現類,或者某些類實例化很耗時,它也被載入並實例化了,這就造成了浪費。
獲取某個實現類的方式不夠靈活,只能通過Iterator 形式獲取,不能根據某個參數來獲取對應的實現類。
多個並發多執行緒使用ServiceLoader 類的實例是不安全的。
載入不到實現類時拋出並不是真正原因的異常,錯誤很難定位。
Dubbo中的SPI
Dubbo 並未使用Java SPI,而是重新實現了一套功能更強的SPI 機制。Dubbo SPI 的相關邏輯被封裝在了ExtensionLoader 類中,通過ExtensionLoader,我們可以載入指定的實現類。
栗子
與Java SPI 實現類配置不同,Dubbo SPI 是通過鍵值對的方式進行配置,這樣我們可以按需載入指定的實現類。下面來演示Dubbo SPI 的用法:
Dubbo SPI 所需的配置文件需放置在META-INF/dubbo 路徑下,與Java SPI 實現類配置不同,DubboSPI 是通過鍵值對的方式進行配置,配置內容如下。
optimusPrime = org.apache.spi.OptimusPrime
bumblebee = org.apache.spi.Bumblebee
在使用Dubbo SPI 時,需要在介面上標註@SPI 註解。
@SPI
public interface Robot {
void sayHello();
}
通過ExtensionLoader,我們可以載入指定的實現類,下面來演示Dubbo SPI :
public class DubboSPITest {
@Test
public void sayHello() throws Exception {
ExtensionLoader<Robot> extensionLoader =
ExtensionLoader.getExtensionLoader(Robot.class);
Robot optimusPrime = extensionLoader.getExtension("optimusPrime");
optimusPrime.sayHello();
Robot bumblebee = extensionLoader.getExtension("bumblebee");
bumblebee.sayHello();
}
}
Dubbo SPI 除了支援按需載入介面實現類,還增加了IOC 和AOP 等特性,這些特性將會在接下來的源碼分析章節中一一進行介紹。
源碼分析
ExtensionLoader 的getExtensionLoader 方法獲取一個ExtensionLoader 實例,然後再通過ExtensionLoader 的getExtension 方法獲取拓展類對象。下面我們從ExtensionLoader 的getExtension 方法作為入口,對拓展類對象的獲取過程進行詳細的分析。
public T getExtension(String name) {
if (StringUtils.isEmpty(name)) {
throw new IllegalArgumentException("Extension name == null");
}
if ("true".equals(name)) {
// 獲取默認的拓展實現類
return getDefaultExtension();
}
// Holder,顧名思義,用於持有目標對象 就是從容器中獲取,如果沒有直接new一個Holder
Holder<Object> holder = getOrCreateHolder(name);
//獲取目標對象實例
Object instance = holder.get();
// 如果目標對象實例為null 就需要通過雙重檢查創建實例
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
// 創建拓展實例
instance = createExtension(name);
// 設置實例到 holder 中
holder.set(instance);
}
}
}
return (T) instance;
}
上面程式碼的邏輯比較簡單,首先檢查快取,快取未命中則創建拓展對象。下面我們來看一下創建拓展對象的過程是怎樣的。
private T createExtension(String name) {
// 從配置文件中載入所有的拓展類,可得到「配置項名稱」到「配置類」的映射關係表
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
//從容器中獲取對應的實例對象 如果不存在就通過反射創建
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
// 通過反射創建實例
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
// 向實例中注入依賴 下面是IOC和AOP的實現
injectExtension(instance);
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
// 循環創建 Wrapper 實例
for (Class<?> wrapperClass : wrapperClasses) {
// 將當前 instance 作為參數傳給 Wrapper 的構造方法,並通過反射創建Wrapper 實例。
// 然後向 Wrapper 實例中注入依賴,最後將 Wrapper 實例再次賦值給instance 變數
instance = injectExtension(
(T)
wrapperClass.getConstructor(type).newInstance(instance));
}
}
createExtension 方法的邏輯稍複雜一下,包含了如下的步驟:
-
通過getExtensionClasses 獲取所有的拓展類
-
通過反射創建拓展對象
-
向拓展對象中注入依賴
-
將拓展對象包裹在相應的Wrapper 對象中
以上步驟中,第一個步驟是載入拓展類的關鍵,第三和第四個步驟是Dubbo IOC 與AOP 的具體實現。由於此類設計源碼較多,這裡簡單的總結下ExtensionLoader整個執行邏輯:
getExtension(String name) #根據key獲取拓展對象
-->createExtension(String name) #創建拓展實例
-->getExtensionClasses #根據路徑獲取所有的拓展類
-->loadExtensionClasses #載入拓展類
-->cacheDefaultExtensionName #解析@SPI註解
-->loadDirectory #方法載入指定文件夾配置文件
-->loadResource #載入資源
-->loadClass #載入類,並通過 loadClass 方法對類進行快取
Dubbo的SPI如何實現IOC和AOP的
Dubbo IOC
Dubbo IOC 是通過setter 方法注入依賴。Dubbo 首先會通過反射獲取到實例的所有方法,然後再遍歷方法列表,檢測方法名是否具有setter 方法特徵。若有,則通過ObjectFactory 獲取依賴對象,最後通過反射調用setter 方法將依賴設置到目標對象中。整個過程對應的程式碼如下:
private T injectExtension(T instance) {
try {
if (objectFactory != null) {
//獲取實例的所有方法
for (Method method : instance.getClass().getMethods()) {
//isSetter做的事:檢測方法是否以 set 開頭,且方法僅有一個參數,且方法訪問級別為 public
if (isSetter(method)) {
/**
* Check {@link DisableInject} to see if we need auto injection for this property
*/
if (method.getAnnotation(DisableInject.class) != null) {
continue;
}
Class<?> pt = method.getParameterTypes()[0];
if (ReflectUtils.isPrimitives(pt)) {
continue;
}
try {
String property = getSetterProperty(method);
//獲取依賴對象
Object object = objectFactory.getExtension(pt, property);
if (object != null) {
//設置屬性
method.invoke(instance, object);
}
} catch (Exception e) {
logger.error("Failed to inject via method " + method.getName()
+ " of interface " + type.getName() + ": " + e.getMessage(), e);
}
}
}
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
return instance;
}
Dubbo Aop
在說這個之前,我們得先知道裝飾者模式
裝飾者模式:在不改變原類文件以及不使用繼承的情況下,動態地將責任附加到對象上,從而實現動態拓展一個對象的功能。它是通過創建一個包裝對象,也就是裝飾來包裹真實的對象。
在用Spring的時候,我們經常會用到AOP功能。在目標類的方法前後插入其他邏輯。比如通常使用Spring AOP來實現日誌,監控和鑒權等功能。Dubbo的擴展機制,是否也支援類似的功能呢?答案是yes。在Dubbo中,有一種特殊的類,被稱為Wrapper類。通過裝飾者模式,使用包裝類包裝原始的擴展點實例。在原始擴展點實現前後插入其他邏輯,實現AOP功能。
一般來說裝飾者模式有下面幾個參與者:
Component:裝飾者和被裝飾者共同的父類,是一個介面或者抽象類,用來定義基本行為
ConcreteComponent:定義具體對象,即被裝飾者
Decorator:抽象裝飾者,繼承自Component,從外類來擴展ConcreteComponent。對於ConcreteComponent來說,不需要知道Decorator的存在,Decorator是一個介面或抽象類
ConcreteDecorator:具體裝飾者,用於擴展ConcreteComponent
//獲取所有需要包裝的類
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
我們再看看cachedWrapperClasses是什麼?
private Set<Class<?>> cachedWrapperClasses;
是一個set集合,那麼集合是什麼時候添加元素的呢?
/**
* cache wrapper class
* <p>
* like: ProtocolFilterWrapper, ProtocolListenerWrapper
*/
private void cacheWrapperClass(Class<?> clazz) {
if (cachedWrapperClasses == null) {
cachedWrapperClasses = new ConcurrentHashSet<>();
}
cachedWrapperClasses.add(clazz);
}
通過這個方法添加的,再看看誰調用了這個私有方法:
/**
* test if clazz is a wrapper class
* <p>
* which has Constructor with given class type as its only argument
*/
private boolean isWrapperClass(Class<?> clazz) {
try {
clazz.getConstructor(type);
return true;
} catch (NoSuchMethodException e) {
return false;
}
}
原來是通過isWrapperClass這個方法,判斷有沒有其他對象中的構造方法中持有本對象,如果有,dubbo就認為這是個裝飾類,調用裝飾者類的構造方法,並返回實例對象
然後通過實例化這個包裝類代替需要載入的這個類。這樣執行的方法就是包裝類的方法。
Dubbo中的動態編譯
我們知道在Dubbo 中,很多拓展都是通過SPI 機制 進行載入的,比如Protocol、Cluster、LoadBalance、ProxyFactory 等。有時,有些拓展並不想在框架啟動階段被載入,而是希望在拓展方法被調用時,根據運行時參數進行載入,即根據參數動態載入實現類。
這種在運行時,根據方法參數才動態決定使用具體的拓展,在dubbo中就叫做擴展點自適應實例。其實是一個擴展點的代理,將擴展的選擇從Dubbo啟動時,延遲到RPC調用時。Dubbo中每一個擴展點都有一個自適應類,如果沒有顯式提供,Dubbo會自動為我們創建一個,默認使用Javaassist。
自適應拓展機制的實現邏輯是這樣的
- 首先Dubbo 會為拓展介面生成具有代理功能的程式碼;
- 通過javassist 或jdk 編譯這段程式碼,得到Class 類;
- 通過反射創建代理類;
- 在代理類中,通過URL對象的參數來確定到底調用哪個實現類;
javassist
Javassist是一個開源的分析、編輯和創建Java位元組碼的類庫。是由東京工業大學的數學和電腦科學系的Shigeru Chiba (千葉滋)所創建的。它已加入了開放源程式碼JBoss 應用伺服器項目,通過使用Javassist對位元組碼操作為JBoss實現動態AOP框架。javassist是jboss的一個子項目,其主要的優點,在於簡單,而且快速。直接使用java編碼的形式,而不需要了解虛擬機指令,就能動態改變類的結構,或者動態生成類。
/**
* Javassist是一個開源的分析、編輯和創建Java位元組碼的類庫
* 能動態改變類的結構,或者動態生成類
*/
public class CompilerByJavassist {
public static void main(String[] args) throws Exception {
// ClassPool:class對象容器
ClassPool pool = ClassPool.getDefault();
// 通過ClassPool生成一個User類
CtClass ctClass = pool.makeClass("com.itheima.domain.User");
// 添加屬性 -- private String username
CtField enameField = new CtField(pool.getCtClass("java.lang.String"),
"username", ctClass);
enameField.setModifiers(Modifier.PRIVATE);
ctClass.addField(enameField);
// 添加屬性 -- private int age
CtField enoField = new CtField(pool.getCtClass("int"), "age", ctClass);
enoField.setModifiers(Modifier.PRIVATE);
ctClass.addField(enoField);
//添加方法
ctClass.addMethod(CtNewMethod.getter("getUsername", enameField));
ctClass.addMethod(CtNewMethod.setter("setUsername", enameField));
ctClass.addMethod(CtNewMethod.getter("getAge", enoField));
ctClass.addMethod(CtNewMethod.setter("setAge", enoField));
// 無參構造器
CtConstructor constructor = new CtConstructor(null, ctClass);
constructor.setBody("{}");
ctClass.addConstructor(constructor);
// 添加構造函數
//ctClass.addConstructor(new CtConstructor(new CtClass[] {}, ctClass));
CtConstructor ctConstructor = new CtConstructor(new CtClass[]
{pool.get(String.class.getName()),CtClass.intType}, ctClass);
ctConstructor.setBody("{\n this.username=$1; \n this.age=$2;\n}");
ctClass.addConstructor(ctConstructor);
// 添加自定義方法
CtMethod ctMethod = new CtMethod(CtClass.voidType, "printUser",new
CtClass[] {}, ctClass);
// 為自定義方法設置修飾符
ctMethod.setModifiers(Modifier.PUBLIC);
// 為自定義方法設置函數體
StringBuffer buffer2 = new StringBuffer();
buffer2.append("{\nSystem.out.println(\"用戶資訊如下\");\n")
.append("System.out.println(\"用戶名=\"+username);\n")
.append("System.out.println(\"年齡=\"+age);\n").append("}");
ctMethod.setBody(buffer2.toString());
ctClass.addMethod(ctMethod);
//生成一個class
Class<?> clazz = ctClass.toClass();
Constructor cons2 =
clazz.getDeclaredConstructor(String.class,Integer.TYPE);
Object obj = cons2.newInstance("itheima",20);
//反射 執行方法
obj.getClass().getMethod("printUser", new Class[] {})
.invoke(obj, new Object[] {});
// 把生成的class文件寫入文件
byte[] byteArr = ctClass.toBytecode();
FileOutputStream fos = new FileOutputStream(new File("D://User.class"));
fos.write(byteArr);
fos.close();
}
}
源碼分析
Adaptive註解
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
String[] value() default {};
}
Adaptive 可註解在類或方法上。
標註在類上:Dubbo 不會為該類生成代理類。
標註在方法上:Dubbo 則會為該方法生成代理邏輯,表示當前方法需要根據 參數URL 調用對應的擴展點實現。
dubbo中每一個擴展點都有一個自適應類,如果沒有顯式提供,Dubbo會自動為我們創建一個,默認使用Javaassist。 先來看下創建自適應擴展類的程式碼
//1、看下extensionLoader的獲取方法
ExtensionLoader<Robot>extensionLoader=ExtensionLoader.getExtensionLoader(Robot.class);
//2、最終調用的是ExtensionLoader的構造方法
private ExtensionLoader(Class<?> type) {
this.type = type;
objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
}
//3、getAdaptiveExtension()看看幹了什麼事
public T getAdaptiveExtension() {
//獲取自適應擴展類,如果沒有就開始初始化一個
Object instance = cachedAdaptiveInstance.get();
if (instance == null) {
if (createAdaptiveInstanceError == null) {
synchronized (cachedAdaptiveInstance) {
instance = cachedAdaptiveInstance.get();
if (instance == null) {
try {
//這裡創建了一個自適應擴展類
instance = createAdaptiveExtension();
cachedAdaptiveInstance.set(instance);
} catch (Throwable t) {
createAdaptiveInstanceError = t;
throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);
}
}
}
} else {
throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
}
}
return (T) instance;
}
//看看createAdaptiveExtension()
private T createAdaptiveExtension() {
try {
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
} catch (Exception e) {
throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);
}
}
//再進到getAdaptiveExtensionClass()
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses();
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass;
}
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}
//繼續追進去createAdaptiveExtensionClass()
private Class<?> createAdaptiveExtensionClass() {
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}
//看看compiler
@SPI("javassist")
public interface Compiler {
/**
* Compile java source code.
*
* @param code Java source code
* @param classLoader classloader
* @return Compiled class
*/
Class<?> compile(String code, ClassLoader classLoader);
}
//其實到這裡就知道了,通過生成一個類的字元串,再通過javassist生成一個對象
createAdaptiveExtensionClassCode()方法中使用一個StringBuilder來構建自適應類的Java源碼。方法實現比較長,這裡就不貼程式碼了。這種生成位元組碼的方式也挺有意思的,先生成Java源程式碼,然後編譯,載入到jvm中。通過這種方式,可以更好的控制生成的Java類。而且這樣也不用care各個位元組碼生成框架的api等。因為xxx.java文件是Java通用的,也是我們最熟悉的。只是程式碼的可讀性不強,需要一點一點構建xx.java的內容。
服務暴露與發現
服務暴露
名詞解釋
在Dubbo 的核心領域模型中:
- Invoker 是實體域,它是Dubbo 的核心模型,其它模型都向它靠擾,或轉換成它,它代表一個可執行體,可向它發起invoke 調用,它有可能是一個本地的實現,也可能是一個遠程的實現,也可能一個集群實現。在服務提供方,Invoker用於調用服務提供類。在服務消費方,Invoker用於執行遠程調用。
- Protocol 是服務域,它是Invoker 暴露和引用的主功能入口,它負責Invoker 的生命周期管理。
export:暴露遠程服務
refer:引用遠程服務 - proxyFactory:獲取一個介面的代理類
getInvoker:針對server端,將服務對象,如DemoServiceImpl包裝成一個Invoker對象
getProxy:針對client端,創建介面的代理對象,例如DemoService的介面。 - Invocation 是會話域,它持有調用過程中的變數,比如方法名,參數等
整體流程
在詳細探討服務暴露細節之前 , 我們先看一下整體duubo的服務暴露原理
在整體上看,Dubbo 框架做服務暴露分為兩大部分 , 第一步將持有的服務實例通過代理轉換成Invoker, 第二步會把Invoker 通過具體的協議 ( 比如Dubbo ) 轉換成Exporter, 框架做了這層抽象也大大方便了功能擴展 。
服務提供方暴露服務的藍色初始化鏈,時序圖如下:
源碼分析
服務導出的入口方法是ServiceBean 的onApplicationEvent。onApplicationEvent 是一個事件響應方法,該方法會在收到Spring 上下文刷新事件後執行服務導出操作。方法程式碼如下:
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!isExported() && !isUnexported()) {
if (logger.isInfoEnabled()) {
logger.info("The service ready on spring started. service: " + getInterface());
}
export();
}
}
通過export最終找到doExportUrls()方法
private void doExportUrls() {
//載入配置文件中的所有註冊中心,並且封裝為dubbo內部的URL對象列表
List<URL> registryURLs = loadRegistries(true);
//循環所有協議配置,根據不同的協議,向註冊中心中發起註冊
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), group, version);
ProviderModel providerModel = new ProviderModel(pathKey, ref, interfaceClass);
ApplicationModel.initProviderModel(pathKey, providerModel);
//服務暴露方法
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
doExportUrlsFor1Protocol()方法程式碼老多了,我們只關係核心的地方
...
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
//本地暴露,將服務數據記錄到本地JVM中
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
//遠程暴露,向註冊中心發送數據
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (!isOnlyInJvm() && logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
//if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = loadMonitor(registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
}
// For providers, this is used to enable custom proxy to generate invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 為服務提供類(ref)生成 Invoker
Invoker<?> invoker = proxyFactory.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
// DelegateProviderMetaDataInvoker 用於持有 Invoker 和ServiceConfig
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
// 導出服務,並生成 Exporter
Exporter<?> exporter = protocol.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
//不存在註冊中心,僅導出服務
....
}
/**
* @since 2.7.0
* ServiceData Store
*/
MetadataReportService metadataReportService = null;
if ((metadataReportService = getMetadataReportService()) != null) {
metadataReportService.publishProvider(url);
}
}
}
this.urls.add(url);
上面程式碼根據url 中的scope 參數決定服務導出方式,分別如下:
scope = none,不導出服務
scope != remote,導出到本地
scope != local,導出到遠程
不管是導出到本地,還是遠程。進行服務導出之前,均需要先創建Invoker,這是一個很重要的步驟。因此下面先來分析Invoker 的創建過程。Invoker 是由ProxyFactory 創建而來,Dubbo 默認的ProxyFactory 實現類是JavassistProxyFactory。下面我們到JavassistProxyFactory 程式碼中,探索Invoker 的創建過程。如下:
@Override
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) {
// 為目標類創建warpper
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
//創建匿名才invoker對象,並實現doinvoke方法
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
// 調用 Wrapper 的 invokeMethod 方法,invokeMethod 最終會調用目標方法
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}
Invoke創建成功之後,接下來我們來看本地導出
/**
* always export injvm
*/
private void exportLocal(URL url) {
URL local = URLBuilder.from(url)
.setProtocol(LOCAL_PROTOCOL) // 設置協議頭為 injvm
.setHost(LOCALHOST_VALUE)//本地ip:127.0.0.1
.setPort(0)
.build();
// 創建 Invoker,並導出服務,這裡的 protocol 會在運行時調用 InjvmProtocol 的export 方法
Exporter<?> exporter = protocol.export(
proxyFactory.getInvoker(ref, (Class) interfaceClass, local));
exporters.add(exporter);
logger.info("Export dubbo service " + interfaceClass.getName() + " to local registry url : " + local);
}
exportLocal 方法比較簡單,首先根據URL 協議頭決定是否導出服務。若需導出,則創建一個新的URL並將協議頭、主機名以及埠設置成新的值。然後創建Invoker,並調用InjvmProtocol 的export 方法導出服務。下面我們來看一下InjvmProtocol 的export 方法都做了哪些事情。
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
return new InjvmExporter<T>(invoker, invoker.getUrl().getServiceKey(), exporterMap);
}
如上,InjvmProtocol 的export 方法僅創建了一個InjvmExporter,無其他邏輯。到此導出服務到本地就分析完了。
再看看導出服務到遠程
接下來,我們繼續分析導出服務到遠程的過程。導出服務到遠程包含了服務導出與服務註冊兩個過程。先來分析服務導出邏輯。我們把目光移動到RegistryProtocol 的export 方法上。
@Override
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
// 獲取註冊中心 URL
URL registryUrl = getRegistryUrl(originInvoker);
URL providerUrl = getProviderUrl(originInvoker);
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//導出服務
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
// 根據 URL 載入 Registry 實現類,比如 ZookeeperRegistry
final Registry registry = getRegistry(originInvoker);
//獲取已註冊的服務提供者 URL,
final URL registeredProviderUrl = getRegisteredProviderUrl(providerUrl, registryUrl);
ProviderInvokerWrapper<T> providerInvokerWrapper = ProviderConsumerRegTable.registerProvider(originInvoker,
registryUrl, registeredProviderUrl);
//to judge if we need to delay publish
boolean register = registeredProviderUrl.getParameter("register", true);
if (register) {
// 向註冊中心註冊服務
register(registryUrl, registeredProviderUrl);
providerInvokerWrapper.setReg(true);
}
// 向註冊中心進行訂閱 override 數據
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);
// 創建並返回 DestroyableExporter
return new DestroyableExporter<>(exporter);
}
上面程式碼看起來比較複雜,主要做如下一些操作:
- 調用doLocalExport 導出服務
- 向註冊中心註冊服務
- 向註冊中心進行訂閱override 數據
- 創建並返回DestroyableExporter
看看doLocalExport 做了什麼
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
//protocol和配置的協議相關(dubbo:DubboProtocol)
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}
接下來,我們把重點放在Protocol 的export 方法上。假設運行時協議為dubbo,此處的protocol 變數會在運行時載入DubboProtocol,並調用DubboProtocol 的export 方法。
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.獲取服務標識,理解成服務坐標也行。由服務組名,服務名,服務版本號以及埠組成。比如:demoGroup/com.alibaba.dubbo.demo.DemoService:1.0.1:20880
String key = serviceKey(url);
//創建DubboExporter
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter); //key:介面 (DemoService)
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
} else {
stubServiceMethodsMap.put(url.getServiceKey(), stubServiceMethods);
}
}
//啟動服務
openServer(url);
//優化序列器
optimizeSerialization(url);
return exporter;
}
如上,我們重點關注DubboExporter 的創建以及openServer 方法,其他邏輯看不懂也沒關係,不影響理解服務導出過程。下面分析openServer 方法。
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
//訪問快取
ExchangeServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
//創建伺服器實例
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
接下來分析伺服器實例的創建過程。如下
private ExchangeServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
// 通過 SPI 檢測是否存在 server 參數所代表的 Transporter 拓展,不存在則拋出異常
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
// 創建 ExchangeServer
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
// 獲取 client 參數,可指定 netty,mina
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
// 獲取所有的 Transporter 實現類名稱集合,比如 supportedTypes = [netty, mina]
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
// 檢測當前 Dubbo 所支援的 Transporter 實現類名稱列表中,
// 是否包含 client 所表示的 Transporter,若不包含,則拋出異常
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return server;
}
如上,createServer 包含三個核心的邏輯。
第一是檢測是否存在server 參數所代表的Transporter 拓展,不存在則拋出異常。
第二是創建伺服器實例。
第三是檢測是否支援client 參數所表示的Transporter 拓展,不存在也是拋出異常。兩次檢測操作所對應的程式碼較直白了,無需多說。但創建伺服器的操作目前還不是很清晰,我們繼續往下看。
public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
// 獲取 Exchanger,默認為 HeaderExchanger。
// 緊接著調用 HeaderExchanger 的 bind 方法創建 ExchangeServer 實例
return getExchanger(url).bind(url, handler);
}
上面程式碼比較簡單,就不多說了。下面看一下HeaderExchanger 的bind 方法。
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
// 創建 HeaderExchangeServer 實例,該方法包含了多個邏輯,分別如下:
// 1. new HeaderExchangeHandler(handler)
// 2. new DecodeHandler(new HeaderExchangeHandler(handler))
// 3. Transporters.bind(url, new DecodeHandler(new
HeaderExchangeHandler(handler)))
return new HeaderExchangeServer(Transporters.bind(url, new ChannelHandler[]{new DecodeHandler(new HeaderExchangeHandler(handler))}));
}
HeaderExchanger 的bind 方法包含的邏輯比較多,但目前我們僅需關心Transporters 的bind 方法邏
輯即可。該方法的程式碼如下:
public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
} else if (handlers != null && handlers.length != 0) {
Object handler;
if (handlers.length == 1) {
handler = handlers[0];
} else {
// 如果 handlers 元素數量大於1,則創建 ChannelHandler 分發器
handler = new ChannelHandlerDispatcher(handlers);
}
// 獲取自適應 Transporter 實例,並調用實例方法
return getTransporter().bind(url, (ChannelHandler)handler);
} else {
throw new IllegalArgumentException("handlers == null");
}
}
如上,getTransporter() 方法獲取的Transporter 是在運行時動態創建的,類名為TransporterAdaptive,也就是自適應拓展類。TransporterAdaptive 會在運行時根據傳入的URL 參數決定載入什麼類型的Transporter,默認為NettyTransporter。調用 NettyTransporter.bind(URL,ChannelHandler) 方法。創建一個 NettyServer 實例。調用 NettyServer.doOPen() 方法,伺服器被開啟,服務也被暴露出來了。
服務註冊
本節內容以Zookeeper 註冊中心作為分析目標,其他類型註冊中心大家可自行分析。下面從服務註冊
的入口方法開始分析,我們把目光再次移到RegistryProtocol 的export 方法上。如下:
進入到register()方法
public void register(URL registryUrl, URL registeredProviderUrl) {
//獲得註冊中心實例
Registry registry = registryFactory.getRegistry(registryUrl);
//進行註冊
registry.register(registeredProviderUrl);
}
看看getRegistry()方法
@Override
public Registry getRegistry(URL url) {
url = URLBuilder.from(url)
.setPath(RegistryService.class.getName())
.addParameter(INTERFACE_KEY, RegistryService.class.getName())
.removeParameters(EXPORT_KEY, REFER_KEY)
.build();
String key = url.toServiceStringWithoutResolving();
// Lock the registry access process to ensure a single instance of the registry
LOCK.lock();
try {
Registry registry = REGISTRIES.get(key);
if (registry != null) {
return registry;
}
//create registry by spi/ioc
registry = createRegistry(url);
if (registry == null) {
throw new IllegalStateException("Can not create registry " + url);
}
REGISTRIES.put(key, registry);
return registry;
} finally {
// Release the lock
LOCK.unlock();
}
}
進入createRegistry()方法
@Override
public Registry createRegistry(URL url) {
return new ZookeeperRegistry(url, zookeeperTransporter);
}
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
super(url);
if (url.isAnyHost()) {
throw new IllegalStateException("registry address == null");
}
//// 獲取組名,默認為 dubbo
String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
if (!group.startsWith(PATH_SEPARATOR)) {
group = PATH_SEPARATOR + group;
}
this.root = group;
// 創建 Zookeeper 客戶端,默認為 CuratorZookeeperTransporter
zkClient = zookeeperTransporter.connect(url);
// 添加狀態監聽器
zkClient.addStateListener(state -> {
if (state == StateListener.RECONNECTED) {
try {
recover();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
});
}
在上面的程式碼程式碼中,我們重點關注ZookeeperTransporter 的connect 方法調用,這個方法用於創建
Zookeeper 客戶端。創建好Zookeeper 客戶端,意味著註冊中心的創建過程就結束了。
搞懂了服務註冊的本質,那麼接下來我們就可以去閱讀服務註冊的程式碼了。
public void doRegister(URL url) {
try {
// 通過 Zookeeper 客戶端創建節點,節點路徑由 toUrlPath 方法生成,路徑格式如下:
// /${group}/${serviceInterface}/providers/${url}
// 比如 /dubbo/org.apache.dubbo.DemoService/providers/dubbo%3A%2F%2F127.0.0.1......
zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
} catch (Throwable e) {
throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
@Override
public void create(String path, boolean ephemeral) {
if (!ephemeral) {
// 如果要創建的節點類型非臨時節點,那麼這裡要檢測節點是否存在
if (checkExists(path)) {
return;
}
}
int i = path.lastIndexOf('/');
if (i > 0) {
// 遞歸創建上一級路徑
create(path.substring(0, i), false);
}
// 根據 ephemeral 的值創建臨時或持久節點
if (ephemeral) {
createEphemeral(path);
} else {
createPersistent(path);
}
}
好了,到此關於服務註冊的過程就分析完了。整個過程可簡單總結為:先創建註冊中心實例,之後再通過註冊中心實例註冊服務。
總結
- 在有註冊中心,需要註冊提供者地址的情況下,ServiceConfig 解析出的URL 格式為:registry:// registry-host/org.apache.dubbo.registry.RegistryService?export=URL.encode(“dubbo://service-host/{服務名}/{版本號}”)
- 基於Dubbo SPI 的自適應機制,通過URL registry:// 協議頭識別,就調用RegistryProtocol#export() 方法
- 將具體的服務類名,比如 DubboServiceRegistryImpl ,通過ProxyFactory 包裝成Invoker 實例
- 調用doLocalExport 方法,使用DubboProtocol 將Invoker 轉化為Exporter 實例,並打開Netty 服務端監聽客戶請求
- 創建Registry 實例,連接Zookeeper,並在服務節點下寫入提供者的URL 地址,註冊服務
- 向註冊中心訂閱override 數據,並返回一個Exporter 實例
- 根據URL 格式中的 “dubbo://service-host/{服務名}/{版本號}” 中協議頭 dubbo:// 識別,調用
DubboProtocol#export()
方法,開發服務埠 - RegistryProtocol#export() 返回的Exporter 實例存放到ServiceConfig 的
List<Exporter>exporters
中