《高性能利器》–非同步調用實現原理詳解!

概述

高可用系列文章 算是告一段落了,主要講了 限流熔斷削峰Sentinel實戰

接下來進入到大家比較喜歡的 高性能系列,主題內容包括,消息隊列快取分散式部署架構等,在上一篇文章- # 秒殺系統架構圖該怎麼畫?手把手教你! ,講解了部落客凄慘的經歷,因此在學習相關技術的時候,我們要將其運用到我們實際的項目中,在高性能篇結束後,將進入架構圖2.0版本

image.png

什麼是非同步

同步調用:調用方在調用過程中,持續等待返回結果。
 
非同步調用:調用方在調用過程中,不直接等待返回結果,而是執行其他任務,結果返回形式通常為回調函數。

image.png

脫離IO,單獨討論同步非同步,我們更加容易去理解它的原理,同步和非同步其實屬於一種通訊機制,表達的是,我們在通訊過程中,是主動去詢問,還是等對方主動回饋。體現在同步(主動)和非同步(被動)上。

進程內非同步調用

1、Thread

進程和執行緒:進程是資源分配的最小單位,執行緒是CPU調度的最小單位

Java進程內最簡單的非同步調用方式,就是通過 new Thread().start() 的方式,啟動新的執行緒進行任務的執行(CPU調度)。

public static void main(String[] args) {
    System.out.println("煲水");
    //創建新的執行緒
    Thread thread1= new Thread(()->{
        try {
            Thread.sleep(5000);
            System.out.println("水開了,"+Thread.currentThread().getName());
        }catch (Exception e){
            e.printStackTrace();
        }
    });
    thread1.start();
    System.out.println("運動");
}

1.1、start() 和 run()

在上述實例程式碼中,我們雖然採用了實現 Runnable 介面的方式,進行新執行緒的實現,但是在方法啟動時,並沒有使用 run() 方法,而是使用了 start() 方法。

run():使用當前執行緒執行 run()方法調用,可以理解時同步調用

start() 方法在調用時,在程式碼邏輯中,會調用到一個本地方法 start0
image.png

下載 JDK源碼 後,可以看到 Thread 類有個 registerNatives 本地方法,該方法主要的作用就是註冊一些本地方法供 Thread 類使用,如 start0(),stop0() 等等,可以說,所有操作本地執行緒的本地方法都是由它註冊的。

image.png

image.png

可以看出 Java 執行緒 調用 start->start0 的方法,實際上會調用到 JVM_StartThread 方法,通過調用 new JavaThread(&thread_entry,sz) 完成執行緒的創建。

jvm.cpp 中,有如下程式碼段:

image.png

在創建完執行緒後,通過 thread_entry 完成 run() 方法的調用

image.png

1.2、Future

Future 的調用方式,屬於同步非阻塞, 主要原因在於,在獲取非同步執行緒處理結果時,需要主執行緒主動去獲取,非同步執行緒並沒有通過主動通知的方式,將數據結構進行更新回調

public static void main(String[] args) throws Exception {
    System.out.println("煲水");
    FutureTask<String> futureTask = new FutureTask(()->{
        try {
            Thread.sleep(5000);
            System.out.println("水開了,"+Thread.currentThread().getName());
        }catch (Exception e){
            e.printStackTrace();
        }
        return "water";
    });
    //創建新的執行緒
    Thread thread1= new Thread(futureTask);
    thread1.start();
    System.out.println("運動");
    Thread.sleep(3000);
    //阻塞等待數據
    String result= futureTask.get(5, TimeUnit.SECONDS);
    System.out.println("喝水," + result);
}

Future 的實現原理

類的繼承關係圖如下,可以看到 FutureTask,實現了 Runnable 介面,那麼在重寫的run() 方法中,可以看到,在調用 call() 方法獲取到結果後,通過CAS的方式,更新到成員變數中。

image.png

任務調用結果更新:

image.png

1.3、ThreadPoolExecutor

public static void main(String[] args) throws Exception {
    ExecutorService executors = Executors.newFixedThreadPool(10);
    System.out.println("煲水");
    Future<String> future = executors.submit(() -> {
        try {
            Thread.sleep(5000);
            System.out.println("水開了," + Thread.currentThread().getName());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "water";
    });
    System.out.println("運動");

    String result = future.get(5, TimeUnit.SECONDS);
    System.out.println("喝水," + result);
}

上面講解了 FutureTask 的實現原理後,這裡在對比 submit()execute(),就比較容易理解了,在submit()方法中,將 Callable<T> 實現類,封裝成了 FutureTask, 然後再進行實際的調用:

image.png

image.png

1.4、總結

核心區別在於 start()run() , start()是啟動一條新的執行緒的同時,完成run()方法,這時候是一個非同步操作;如果直接執行run()方法, 則會在當前執行緒直接執行,是一個同步阻塞操作

Future 的調用方式,則是一個 同步非阻塞處理,在提交了任務後,不阻塞主執行緒的繼續執行,在到了一定時間後,主執行緒可以通過get() 方法,獲取非同步任務處理結果。

ThreadPoolExecutor 則是維護了一個可復用的執行緒池,解決了資源復用性能耗時的問題, Java執行緒默認大小為1MB,執行緒的創建銷毀都會佔用記憶體和GC耗時;而執行緒的無限制創建, 則會帶來CPU負載過高,每個執行緒分配的時間片很少,導致處理效率低。

2、EventBus

public class JiulingTest {
    public static void main(String[] args) throws Exception {
        System.out.println("開始");
        //使用非同步事件匯流排
        EventBus eventBus  = new AsyncEventBus(Executors.newFixedThreadPool(10));
        // 向上述EventBus對象中註冊一個監聽對象
        eventBus.register(new EventListener());
        // 使用EventBus發布一個事件,該事件會給通知到所有註冊的監聽者
        eventBus.post(new Event("煲水"));
        System.out.println("運動");
    }
}
// 事件,監聽者監聽的事件的包裝對象
class Event {
    //事件動作
    public String action;

    Event(String action) {
        this.action = action;
    }
}

// 監聽者
 class EventListener {
    // 監聽的方法,必須使用註解聲明,且只能有一個參數,實際觸發一個事件的時候會根據參數類型觸發方法
    @Subscribe
    public void listen(Event event) {
        try {
            System.out.println("Event listener receive message:  " + event.action + " threadName:" + Thread.currentThread().getName());
            Thread.sleep(5000);
            System.out.println("水開了!");
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

2.1、觀察者模式

image.png

EventBus 中,通過 @Subscribe 定義了抽象觀察者的行為, 通過入參區分不同的事件監聽動作,如上述的示例程式碼中, listen(Event event) 只會觀察這個類的事件。

/**
 * Returns all subscribers for the given listener grouped by the type of event they subscribe to.
 */
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
  Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
  Class<?> clazz = listener.getClass();
  //遍歷 @Subscribe 的方法
  for (Method method : getAnnotatedMethods(clazz)) {
    Class<?>[] parameterTypes = method.getParameterTypes();
    Class<?> eventType = parameterTypes[0];
    //然後根據 參數類型,也就是事件類型,進行歸類
    methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
  }
  return methodsInListener;
}

然後在進行事件發布的時候,通過調用 EventBus.post() 方法,遍歷找到所有的監聽方法:

public void post(Object event) {
//從上述歸類的Map 中,找到所有的觀察者方法
  Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
  if (eventSubscribers.hasNext()) {
  //事件分發,具體調用
    dispatcher.dispatch(event, eventSubscribers);
  } else if (!(event instanceof DeadEvent)) {
    // the event had no subscribers and was not itself a DeadEvent
    post(new DeadEvent(this, event));
  }
}

2.2、AsyncEventBus

在示例程式碼中,我們使用的是 new AsyncEventBus(Executors.newFixedThreadPool(10)) 構建的非同步事件匯流排。

由下往上倒推,我們先看 Listern ,是如何執行事件處理方法的,這裡比較好理解,通過執行緒池完成任務的調用,具體實現是 通過反射的方式調用 @Subscribe 註解的方法。

image.png

那麼這裡的 executor 是怎麼來的呢?

this.executor = bus.executor(); //從事件匯流排傳遞過來

回到 EventBus 中,我們可以看到構造函數並沒有提供初始化執行緒池的入口,那麼默認執行緒池的創建,可以跟蹤到

image.png

這個執行緒池的 execute 方法,並沒有創建新的執行緒執行 Runnable 方法,而是使用當前執行緒執行(具體邏輯參考1.1)。 因此 EventBus 是不支援非同步事件處理的!

image.png

dispatchEvent 方法中,比較直接可以看到整體設計中,是支援非同步事件的,我們需要做的就是將 Executor 修改成一個合理的執行緒池, 而 AsyncEventBus 恰恰提供了這個能力。

/**
 * Creates a new AsyncEventBus that will use {@code executor} to dispatch events.
 *
 * @param executor Executor to use to dispatch events. It is the caller's responsibility to shut
 *     down the executor after the last event has been posted to this event bus.
 */
public AsyncEventBus(Executor executor) {
  super("default", executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
}

3、Spring Event

Spring Event Event Bus 默認都是同步執行,支援通過設置 Executors 的方式修改成非同步事件。

image.png

核心組件:

  • 事件類:定義事件,繼承ApplicationEvent的類成為一個事件類。
  • 發布者:發布事件,通過ApplicationEventPublisher發布事件。
  • 監聽者:監聽並處理事件,實現ApplicationListener介面或者使用@EventListener註解。

由於程式碼過多,可以直接github 下載 進行閱讀,這裡只貼部分關鍵程式碼:

在發布事件方法:AbstractApplicationContext#publishEvent
會走到 下圖中的 SimpleApplicationEventMulticaster#multicastEvent 執行具體任務的調度。 這裡的設計與 上面的 EventBus 如出一轍,在執行時,通過區分執行緒池進行實際的調度,從而決定 同步|非同步!

image.png

3.1、非同步之ApplicationEventMulticaster

修改 ApplicationEventMulticaster 設置初始執行緒池, 和EventBus 的解決思路一致:

@Order(0)
@Bean
public ApplicationEventMulticaster applicationEventMulticaster() {
    SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster();
    eventMulticaster.setTaskExecutor(Executors.newFixedThreadPool(10));
    return eventMulticaster;
}

在Spring 上下文初始化的時候,會將這一個bean,載入到上下文中,

image.png

存在的問題: 由於將整個上下文的 ApplicationEventMulticaster都替換了,那麼在事件處理的流程上,所有的事件都會以非同步的方式進行,那麼風險的把控就很難做好。不建議,但能用(畢竟經受過考驗

3.2、非同步之@Async

通過實現 AsyncConfigurer 介面,自定義執行緒池,對切面方法,執行反射代理

image.png

image.png

org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke

核心原理

image.png

進程間非同步調用

Dubbo 非同步調用

rpc框架中,我們普遍使用的都是同步調用模式,但是在 Dubbo 的底層實現中,反而是以 非同步調用的方式實現的。先來簡單看看調用鏈路

image.png

核心程式碼在

com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke

image.png

消息隊列非同步解耦

在介紹 EventBus 的時候, 我查看了很多文章,都將EventBus設計模式 描述為發布-訂閱 模式。首先這個描述是錯誤的,然後我們來對比一下他們的區別:

從表面上看:

  • 觀察者模式里,只有兩個角色 —— 觀察者 + 被觀察者
  • 而發布訂閱模式里,卻不僅僅只有發布者和訂閱者兩個角色,還有一個經常被我們忽略的 —— 經紀人Broker

往更深層次講:

  • 觀察者和被觀察者,是松耦合的關係
  • 發布者和訂閱者,則完全不存在耦合

發布-訂閱模式:

image.png

消息隊列能夠幫我們做到解耦的效果,通過消息中間件,如 RocketMQkafkarabbitMQ等; 完成消息的接收和推送,從而達到非同步處理的效果。

點關注,不迷路

圖片地址:draw.io原圖

好了各位,以上就是這篇文章的全部內容了,我後面會每周都更新幾篇高品質的大廠面試和常用技術棧相關的文章。感謝大夥能看到這裡,如果這個文章寫得還不錯, 求三連!!! 感謝各位的支援和認可,我們下篇文章見!

我是 九靈 ,有需要交流的童鞋可以關注公眾號:Java 補習課,掌握第一手資料! 如果本篇部落格有任何錯誤,請批評指教,不勝感激 !