《高性能利器》–非同步調用實現原理詳解!
概述
高可用系列文章
算是告一段落了,主要講了 限流
,熔斷
,削峰
和 Sentinel實戰
:
- 《高可用系列》– 你來說說什麼是限流?
- 《高可用系列》– 限流神器Sentinel,不了解一下嗎?
- 《高可用系列》– 阿里P7大佬帶你解密Sentinel
- 《高可用系列》– 熔斷降級我學會了!
- 《高可用系列》–來聊聊削峰填谷!
接下來進入到大家比較喜歡的 高性能系列
,主題內容包括,消息隊列
,快取
和分散式部署架構
等,在上一篇文章- # 秒殺系統架構圖該怎麼畫?手把手教你! ,講解了部落客凄慘
的經歷,因此在學習相關技術的時候,我們要將其運用到我們實際的項目中,在高性能篇
結束後,將進入架構圖2.0版本
~
什麼是非同步
同步調用:調用方在調用過程中,持續等待返回結果。
非同步調用:調用方在調用過程中,不直接等待返回結果,而是執行其他任務,結果返回形式通常為回調函數。
脫離IO
,單獨討論同步
和非同步
,我們更加容易去理解它的原理,同步和非同步其實屬於一種通訊機制
,表達的是,我們在通訊過程中,是主動去詢問,還是等對方主動回饋。體現在同步(主動)
和非同步(被動)
上。
進程內非同步調用
1、Thread
進程和執行緒:進程是資源分配的最小單位,執行緒是CPU調度的最小單位
Java進程
內最簡單的非同步調用方式,就是通過 new Thread().start()
的方式,啟動新的執行緒進行任務的執行(CPU調度
)。
public static void main(String[] args) {
System.out.println("煲水");
//創建新的執行緒
Thread thread1= new Thread(()->{
try {
Thread.sleep(5000);
System.out.println("水開了,"+Thread.currentThread().getName());
}catch (Exception e){
e.printStackTrace();
}
});
thread1.start();
System.out.println("運動");
}
1.1、start() 和 run()
在上述實例程式碼中,我們雖然採用了實現 Runnable
介面的方式,進行新執行緒的實現,但是在方法啟動時,並沒有使用 run()
方法,而是使用了 start()
方法。
run():使用當前執行緒執行 run()方法調用,可以理解時同步調用
start()
方法在調用時,在程式碼邏輯中,會調用到一個本地方法 start0
,
下載 JDK源碼 後,可以看到 Thread 類
有個 registerNatives
本地方法,該方法主要的作用就是註冊一些本地方法供 Thread 類使用,如 start0(),stop0()
等等,可以說,所有操作本地執行緒的本地方法都是由它註冊的。
可以看出 Java 執行緒
調用 start->start0
的方法,實際上會調用到 JVM_StartThread
方法,通過調用 new JavaThread(&thread_entry,sz)
完成執行緒的創建。
在 jvm.cpp
中,有如下程式碼段:
在創建完執行緒後,通過 thread_entry
完成 run()
方法的調用
1.2、Future
Future
的調用方式,屬於同步非阻塞
, 主要原因在於,在獲取非同步執行緒處理結果時,需要主執行緒主動去獲取,非同步執行緒並沒有通過主動通知
的方式,將數據結構進行更新
或回調
。
public static void main(String[] args) throws Exception {
System.out.println("煲水");
FutureTask<String> futureTask = new FutureTask(()->{
try {
Thread.sleep(5000);
System.out.println("水開了,"+Thread.currentThread().getName());
}catch (Exception e){
e.printStackTrace();
}
return "water";
});
//創建新的執行緒
Thread thread1= new Thread(futureTask);
thread1.start();
System.out.println("運動");
Thread.sleep(3000);
//阻塞等待數據
String result= futureTask.get(5, TimeUnit.SECONDS);
System.out.println("喝水," + result);
}
Future 的實現原理
類的繼承關係圖如下,可以看到 FutureTask
,實現了 Runnable
介面,那麼在重寫的run()
方法中,可以看到,在調用 call()
方法獲取到結果後,通過CAS
的方式,更新到成員變數
中。
任務調用結果更新:
1.3、ThreadPoolExecutor
public static void main(String[] args) throws Exception {
ExecutorService executors = Executors.newFixedThreadPool(10);
System.out.println("煲水");
Future<String> future = executors.submit(() -> {
try {
Thread.sleep(5000);
System.out.println("水開了," + Thread.currentThread().getName());
} catch (Exception e) {
e.printStackTrace();
}
return "water";
});
System.out.println("運動");
String result = future.get(5, TimeUnit.SECONDS);
System.out.println("喝水," + result);
}
上面講解了 FutureTask
的實現原理後,這裡在對比 submit()
和 execute()
,就比較容易理解了,在submit()
方法中,將 Callable<T>
實現類,封裝成了 FutureTask
, 然後再進行實際的調用:
1.4、總結
核心區別在於 start()
和 run()
, start()是啟動一條新的執行緒
的同時,完成run()
方法,這時候是一個非同步操作
;如果直接執行run()
方法, 則會在當前執行緒
直接執行,是一個同步阻塞操作
。
而 Future
的調用方式,則是一個 同步非阻塞
處理,在提交了任務
後,不阻塞主執行緒的繼續執行,在到了一定時間後,主執行緒可以通過get()
方法,獲取非同步任務
處理結果。
ThreadPoolExecutor
則是維護了一個可復用的執行緒池,解決了資源復用
,性能耗時
的問題, Java執行緒
默認大小為1MB
,執行緒的創建
和銷毀
都會佔用記憶體和GC耗時;而執行緒的無限制創建, 則會帶來CPU負載過高
,每個執行緒分配的時間片很少,導致處理效率低。
2、EventBus
public class JiulingTest {
public static void main(String[] args) throws Exception {
System.out.println("開始");
//使用非同步事件匯流排
EventBus eventBus = new AsyncEventBus(Executors.newFixedThreadPool(10));
// 向上述EventBus對象中註冊一個監聽對象
eventBus.register(new EventListener());
// 使用EventBus發布一個事件,該事件會給通知到所有註冊的監聽者
eventBus.post(new Event("煲水"));
System.out.println("運動");
}
}
// 事件,監聽者監聽的事件的包裝對象
class Event {
//事件動作
public String action;
Event(String action) {
this.action = action;
}
}
// 監聽者
class EventListener {
// 監聽的方法,必須使用註解聲明,且只能有一個參數,實際觸發一個事件的時候會根據參數類型觸發方法
@Subscribe
public void listen(Event event) {
try {
System.out.println("Event listener receive message: " + event.action + " threadName:" + Thread.currentThread().getName());
Thread.sleep(5000);
System.out.println("水開了!");
}catch (Exception e){
e.printStackTrace();
}
}
}
2.1、觀察者模式
在 EventBus
中,通過 @Subscribe
定義了抽象觀察者的行為, 通過入參
區分不同的事件監聽動作
,如上述的示例程式碼中, listen(Event event)
只會觀察這個類的事件。
/**
* Returns all subscribers for the given listener grouped by the type of event they subscribe to.
*/
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
Class<?> clazz = listener.getClass();
//遍歷 @Subscribe 的方法
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
//然後根據 參數類型,也就是事件類型,進行歸類
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
然後在進行事件發布的時候,通過調用 EventBus.post()
方法,遍歷找到所有的監聽方法:
public void post(Object event) {
//從上述歸類的Map 中,找到所有的觀察者方法
Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
if (eventSubscribers.hasNext()) {
//事件分發,具體調用
dispatcher.dispatch(event, eventSubscribers);
} else if (!(event instanceof DeadEvent)) {
// the event had no subscribers and was not itself a DeadEvent
post(new DeadEvent(this, event));
}
}
2.2、AsyncEventBus
在示例程式碼中,我們使用的是 new AsyncEventBus(Executors.newFixedThreadPool(10))
構建的非同步事件匯流排。
由下往上倒推,我們先看 Listern
,是如何執行事件處理方法的,這裡比較好理解
,通過執行緒池完成任務的調用,具體實現是 通過反射的方式調用 @Subscribe
註解的方法。
那麼這裡的 executor
是怎麼來的呢?
this.executor = bus.executor(); //從事件匯流排傳遞過來
回到 EventBus
中,我們可以看到構造函數並沒有提供初始化執行緒池的入口,那麼默認執行緒池的創建,可以跟蹤到
這個執行緒池的 execute
方法,並沒有創建新的執行緒執行 Runnable 方法
,而是使用當前執行緒
執行(具體邏輯參考1.1
)。 因此 EventBus
是不支援非同步事件處理的!
在 dispatchEvent
方法中,比較直接可以看到整體設計中,是支援非同步事件的,我們需要做的就是將 Executor
修改成一個合理的執行緒池, 而 AsyncEventBus
恰恰提供了這個能力。
/**
* Creates a new AsyncEventBus that will use {@code executor} to dispatch events.
*
* @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
* down the executor after the last event has been posted to this event bus.
*/
public AsyncEventBus(Executor executor) {
super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}
3、Spring Event
Spring Event
與 Event Bus
默認都是同步執行,支援通過設置 Executors
的方式修改成非同步事件。
核心組件:
- 事件類:定義事件,繼承
ApplicationEvent
的類成為一個事件類。 - 發布者:發布事件,通過
ApplicationEventPublisher
發布事件。 - 監聽者:監聽並處理事件,實現
ApplicationListener
介面或者使用@EventListener
註解。
由於程式碼過多,可以直接github 下載 進行閱讀,這裡只貼部分關鍵程式碼:
在發布事件方法:AbstractApplicationContext#publishEvent
會走到 下圖中的 SimpleApplicationEventMulticaster#multicastEvent
執行具體任務的調度。 這裡的設計與 上面的 EventBus
如出一轍,在執行時,通過區分執行緒池進行實際的調度,從而決定 同步|非同步
!
3.1、非同步之ApplicationEventMulticaster
修改 ApplicationEventMulticaster
設置初始執行緒池, 和EventBus
的解決思路一致:
@Order(0)
@Bean
public ApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
eventMulticaster.setTaskExecutor(Executors.newFixedThreadPool(10));
return eventMulticaster;
}
在Spring 上下文初始化的時候,會將這一個bean,載入到上下文中,
存在的問題: 由於將整個上下文的 ApplicationEventMulticaster
都替換了,那麼在事件處理的流程上,所有的事件都會以非同步的方式進行,那麼風險的把控就很難做好。不建議,但能用(畢竟經受過考驗
)
3.2、非同步之@Async
通過實現 AsyncConfigurer
介面,自定義執行緒池,對切面方法,執行反射代理
org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke
核心原理
進程間非同步調用
Dubbo 非同步調用
在rpc
框架中,我們普遍使用的都是同步調用
模式,但是在 Dubbo
的底層實現中,反而是以 非同步調用
的方式實現的。先來簡單看看調用鏈路
:
核心程式碼在
com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
消息隊列非同步解耦
在介紹 EventBus
的時候, 我查看了很多文章,都將EventBus
的設計模式
描述為發布-訂閱
模式。首先這個描述是錯誤的,然後我們來對比一下他們的區別:
從表面上看:
- 觀察者模式里,只有兩個角色 —— 觀察者 + 被觀察者
- 而發布訂閱模式里,卻不僅僅只有發布者和訂閱者兩個角色,還有一個經常被我們忽略的 —— 經紀人Broker
往更深層次講:
- 觀察者和被觀察者,是松耦合的關係
- 發布者和訂閱者,則完全不存在耦合
發布-訂閱
模式:
消息隊列
能夠幫我們做到解耦
的效果,通過消息中間件,如 RocketMQ
,kafka
和rabbitMQ
等; 完成消息的接收和推送,從而達到非同步處理
的效果。
點關注,不迷路
圖片地址:draw.io原圖
好了各位,以上就是這篇文章的全部內容了,我後面會每周都更新幾篇高品質的大廠面試和常用技術棧相關的文章。感謝大夥能看到這裡,如果這個文章寫得還不錯, 求三連!!! 感謝各位的支援和認可,我們下篇文章見!
我是 九靈
,有需要交流的童鞋可以關注公眾號:Java 補習課
,掌握第一手資料! 如果本篇部落格有任何錯誤,請批評指教,不勝感激 !