虛擬線程 – VirtualThread源碼透視

前提

JDK192022-09-20發佈GA版本,該版本提供了虛擬線程的預覽功能。下載JDK19之後翻看了一下有關虛擬線程的一些源碼,跟早些時候的Loom項目構建版本基本並沒有很大出入,也跟第三方JDK如鵝廠的Kona虛擬線程實現方式基本一致,這裡分析一下虛擬線程設計與源碼實現。

Platform Thread與Virtual Thread

因為引入了虛擬線程,原來JDK存在java.lang.Thread類,俗稱線程,為了更好地區分虛擬線程和原有的線程類,引入了一個全新類java.lang.VirtualThreadThread類的一個子類型),直譯過來就是”虛擬線程”。

  • 題外話:在Loom項目早期規劃裏面,核心API其實命名為Fiber,直譯過來就是”纖程”或者”協程”,後來成為了廢案,在一些歷史提交的Test類或者文檔中還能看到類似於下面的代碼:
// java.lang.Fiber
Fiber f = Fiber.execute({
    out.println("Good morning");
    readLock.lock();
    try{
        out.println("Good night");
    } finally{
        readLock.unlock();
    }
    out.println("Good night");
});

Thread在此基礎上做了不少兼容性工作。此外,還應用了建造者模式引入了線程建造器,提供了靜態工廠方法Thread#ofPlatform()Thread#ofVirtual()分別用於實例化Thread(工廠)建造器和VirtualThread(工廠)建造器,顧名思義,兩種建造器分別用於創建Thread或者VirtualThread,例如:

// demo-1 build platform thread
Thread platformThread = Thread.ofPlatform().daemon().name("worker").unstarted(runnable);

// demo-2 create platform thread factory
ThreadFactory platformThreadFactory = Thread.ofPlatform().daemon().name("worker-", 0).factory();

// demo-3 build virtual thread
Thread virtualThread = Thread.ofVirtual().name("virtual-worker").unstarted(runnable);

// demo-4 create virtual thread factory
ThreadFactory virtualThreadFactory = Thread.ofVirtual().name("virtual-worker-", 0).factory();

更新的JDK文檔中也把原來的Thread稱為Platform Thread,可以更明晰地與Virtual Thread區分開來。這裡Platform Thread直譯為”平台線程”,其實就是”虛擬線程”出現之前的老生常談的”線程”。

後文會把Platform Thread稱為平台線程,Virtual Thread稱為虛擬線程,或者直接用其英文名稱

那麼平台線程與虛擬線程的聯繫和區別是什麼?JDK中的每個java.lang.Thread實例也就是每個平台線程實例都在底層操作系統線程上運行Java代碼,並且平台線程在運行代碼的整個生命周期內捕獲系統線程。可以得出一個結論,平台線程與底層系統線程是一一對應的,平台線程實例本質是由系統內核的線程調度程序進行調度,並且平台線程的總數量受限於系統線程的總數量。

vt-source-code-1

總的來說,平台線程有下面的一些特點或者說限制:

  • 資源有限導致系統線程總量有限,進而導致與系統線程一一對應的平台線程有限
  • 平台線程的調度依賴於系統的線程調度程序,當平台線程創建過多,會消耗大量資源用於處理線程上下文切換
  • 每個平台線程都會開闢一塊私有的棧空間,大量平台線程會佔據大量內存

這些限制導致開發者不能極大量地創建平台線程,為了滿足性能需要,需要引入池化技術、添加任務隊列構建消費者-生產者模式等方案去讓平台線程適配多變的現實場景。顯然,開發者們迫切需要一種輕量級線程實現,剛好可以彌補上面提到的平台線程的限制,這種輕量級線程可以滿足:

  • 可以大量創建,例如十萬級別、百萬級別,而不會佔據大量內存
  • JVM進行調度和狀態切換,並且與系統線程”鬆綁”
  • 用法與原來平台線程差不多,或者說盡量兼容平台線程現存的API

Loom項目中開發的虛擬線程就是為了解決這個問題,看起來它的運行示意圖如下:

vt-source-code-2

當然,平台線程不是簡單地與虛擬線程進行1:N的綁定,後面的章節會深入分析虛擬線程的運行原理。

虛擬線程實現原理

虛擬線程是一種輕量級(用戶模式)線程,這種線程是由Java虛擬機調度,而不是操作系統。虛擬線程佔用空間小,任務切換開銷幾乎可以忽略不計,因此可以極大量地創建和使用。總體來看,虛擬線程實現如下:

virtual thread = continuation + scheduler

虛擬線程會把任務(一般是java.lang.Runnable)包裝到一個Continuation實例中:

  • 當任務需要阻塞掛起的時候,會調用Continuationyield操作進行阻塞
  • 當任務需要解除阻塞繼續執行的時候,Continuation會被繼續執行

Scheduler也就是執行器,會把任務提交到一個載體線程池中執行:

  • 執行器是java.util.concurrent.Executor的子類
  • 虛擬線程框架提供了一個默認的ForkJoinPool用於執行虛擬線程任務

下文會把carrier thread稱為”載體線程”,指的是負責執行虛擬線程中任務的平台線程,或者說運行虛擬線程的平台線程稱為它的載體線程

操作系統調度系統線程,而Java平台線程與系統線程一一映射,所以平台線程被操作系統調度,但是虛擬線程是由JVM調度。JVM把虛擬線程分配給平台線程的操作稱為mount(掛載),反過來取消分配平台線程的操作稱為unmount(卸載):

  • mount操作:虛擬線程掛載到平台線程,虛擬線程中包裝的Continuation棧數據幀或者引用棧數據會被拷貝到平台線程的線程棧,這是一個從堆複製到棧的過程
  • unmount操作:虛擬線程從平台線程卸載,大多數虛擬線程中包裝的Continuation棧數據幀會留在堆內存中

這個mount -> run -> unmount過程用偽代碼表示如下:

mount();
try {
    Continuation.run();
} finally {
    unmount();
}

Java代碼的角度來看,虛擬線程和它的載體線程暫時共享一個OS線程實例這個事實是不可見,因為虛擬線程的堆棧跟蹤和線程本地變量與平台線程是完全隔離的。JDK中專門是用了一個FIFO模式的ForkJoinPool作為虛擬線程的調度程序,從這個調度程序看虛擬線程任務的執行流程大致如下:

  • 調度器(線程池)中的平台線程等待處理任務

vt-source-code-5

  • 一個虛擬線程被分配平台線程,該平台線程作為運載線程執行虛擬線程中的任務

vt-source-code-6

  • 虛擬線程運行其Continuation,從而執行基於Runnable包裝的用戶任務

vt-source-code-7

  • 虛擬線程任務執行完成,標記Continuation終結,標記虛擬線程為終結狀態,清空一些上下文變量,運載線程”返還”到調度器(線程池)中作為平台線程等待處理下一個任務

vt-source-code-5

上面是描述一般的虛擬線程任務執行情況,在執行任務時候首次調用Continuation#run()獲取鎖(ReentrantLock)的時候會觸發Continuationyield操作讓出控制權,等待虛擬線程重新分配運載線程並且執行,見下面的代碼:

public class VirtualThreadLock {

    public static void main(String[] args) throws Exception {
        ReentrantLock lock = new ReentrantLock();
        Thread.startVirtualThread(() -> {
            lock.lock();     // <------ 這裡確保鎖已經被另一個虛擬線程持有
        });
        Thread.sleep(1000);
        Thread.startVirtualThread(() -> {
            System.out.println("first");
            lock.lock();
            try {
                System.out.println("second");
            } finally {
                lock.unlock();
            }
            System.out.println("third");
        });
        Thread.sleep(Long.MAX_VALUE);
    }
}
  • 虛擬線程中任務執行時候首次調用Continuation#run()執行了部分任務代碼,然後嘗試獲取鎖,會導致Continuationyield操作讓出控制權(任務切換),也就是unmount,運載線程棧數據會移動到Continuation棧的數據幀中,保存在堆內存,虛擬線程任務完成(但是虛擬線程沒有終結,同時其Continuation也沒有終結和釋放),運載線程被釋放到執行器中等待新的任務;如果Continuationyield操作失敗,則會對運載線程進行park調用,阻塞在運載線程上

vt-source-code-8

  • 當鎖持有者釋放鎖之後,會喚醒虛擬線程獲取鎖(成功後),虛擬線程會重新進行mount,讓虛擬線程任務再次執行,有可能是分配到另一個運載線程中執行,Continuation棧會的數據幀會被恢復到運載線程棧中,然後再次調用Continuation#run()恢復任務執行:

vt-source-code-9

  • 最終虛擬線程任務執行完成,標記Continuation終結,標記虛擬線程為終結狀態,清空一些上下文變量,運載線程”返還”到調度器(線程池)中作為平台線程等待處理下一個任務

Continuation組件十分重要,它既是用戶真實任務的包裝器,也是任務切換虛擬線程與平台線程之間數據轉移的一個句柄,它提供的yield操作可以實現任務上下文的中斷和恢復。由於Continuation被封閉在java.base/jdk.internal.vm下,可以通過增加編譯參數--add-exports java.base/jdk.internal.vm=ALL-UNNAMED暴露對應的功能,從而編寫實驗性案例,IDEA中可以按下圖進行編譯參數添加:

vt-source-code-10

然後編寫和運行下面的例子:

import jdk.internal.vm.Continuation;
import jdk.internal.vm.ContinuationScope;

public class ContinuationDemo {

    public static void main(String[] args) {
        ContinuationScope scope = new ContinuationScope("scope");
        Continuation continuation = new Continuation(scope, () -> {
            System.out.println("Running before yield");
            Continuation.yield(scope);
            System.out.println("Running after yield");
        });
        System.out.println("First run");
        // 第一次執行Continuation.run
        continuation.run();
        System.out.println("Second run");
        // 第二次執行Continuation.run
        continuation.run();
        System.out.println("Done");
    }
}

// 運行代碼,神奇的結果出現了
First run
Running before yield
Second run
Running after yield
Done

這裡可以看出Continuation的奇妙之處,Continuation實例進行yield調用後,再次調用其run方法就可以從yield的調用之處往下執行,從而實現了程序的中斷和恢復。

源碼分析

主要包括:

  • Continuation
  • VirtualThread
  • 線程建造器

Continuation

Continuation直譯為”連續”,一般來說表示一種語言構造,使語言可以在任意點保存執行狀態並且在之後的某個點返回。在JDK中對應類jdk.internal.vm.Continuation,這個類只有一句類注釋A one-shot delimited continuation,直譯為一個只能執行一次的回調函數。由於Continuation的成員和方法缺少詳細的注釋,並且大部分功能由JVM實現,這裡只能閱讀其一些骨幹源碼和上一小節編寫的Continuation相關例子去了解其實現(筆者C語言比較薄弱,有興趣的可以翻閱JVM的源碼)。先看成員變量和構造函數:

// 判斷是否需要保留當前線程的本地緩存,由系統參數jdk.preserveExtentLocalCache決定
private static final boolean PRESERVE_EXTENT_LOCAL_CACHE;

// 真正要被執行的任務實例
private final Runnable target;

// 標識Continuation的範圍,
private final ContinuationScope scope;

// Continuation的父節點,如果為空的時候則為本地線程棧
private Continuation parent;

// Continuation的子節點,非空時候說明在子Continuation中進行了yield操作
private Continuation child;

// 猜測為Continuation棧結構,由JVM管理,無法得知其真實作用
private StackChunk tail;

// 標記Continuation是否已經完成
private boolean done;

// 標記是否進行了mount操作
private volatile boolean mounted = false;

// yield操作時候設置的信息
private Object yieldInfo;

// 標記一個未掛載的Continuation是否通過強制搶佔式卸載
private boolean preempted;

// 保留當前線程的本地緩存的副本
private Object[] extentLocalCache;

// 構造函數,要求傳入範圍和任務包裝實例
public Continuation(ContinuationScope scope, Runnable target) {
    this.scope = scope;
    this.target = target;
}

Continuation是一個雙向鏈表設計,它的唯一一組構造參數是ContinuationScopeRunnable

vt-source-code-11

這裡不深入研究內部StackChunkPinned等實現,直接看runenter系列方法和yield方法:

// Continuation.run()
public final void run() {
    // 設置死循環
    while (true) {
        // 進行mount操作
        mount();
        JLA.setExtentLocalCache(extentLocalCache);
        // 如果Continuation已完成則拋出異常
        if (done)
            throw new IllegalStateException("Continuation terminated");
        // 獲取當前虛擬線程分配的運載線程
        Thread t = currentCarrierThread();
        if (parent != null) {
            if (parent != JLA.getContinuation(t))
                throw new IllegalStateException();
        } else
            this.parent = JLA.getContinuation(t);
        // 運載線程設置當前Continuation實例
        JLA.setContinuation(t, this);

        try {
            // 判斷ContinuationScope是否虛擬線程範圍
            boolean isVirtualThread = (scope == JLA.virtualThreadContinuationScope());
            if (!isStarted()) { // is this the first run? (at this point we know !done)
                // 激活enter系列方法,標記isContinue為false,標記是否虛擬線程範圍
                enterSpecial(this, false, isVirtualThread);
            } else {
                assert !isEmpty();
                // 激活enter系列方法,標記isContinue為true,標記是否虛擬線程範圍
                enterSpecial(this, true, isVirtualThread);
            }
        } finally {
            // 設置內存屏障
            fence();
            try {
                assert isEmpty() == done : "empty: " + isEmpty() + " done: " + done + " cont: " + Integer.toHexString(System.identityHashCode(this));
                // 當前Continuation執行完成後,把運載線程的Continuation指向父Continuation
                JLA.setContinuation(currentCarrierThread(), this.parent);
                if (parent != null)
                    parent.child = null;
                // 進行後置的yield清理工作
                postYieldCleanup();
                // 進行unmount操作
                unmount();
                // 判斷是否需要保留當前線程的本地緩存並處理
                if (PRESERVE_EXTENT_LOCAL_CACHE) {
                    extentLocalCache = JLA.extentLocalCache();
                } else {
                    extentLocalCache = null;
                }
                JLA.setExtentLocalCache(null);
            } catch (Throwable e) { e.printStackTrace(); System.exit(1); }
        }
        // we're now in the parent continuation

        assert yieldInfo == null || yieldInfo instanceof ContinuationScope;
        // 父Continuation的yieldInfo緩存當前的scope實例,清空當前Continuation的父節點和yieldInfo
        if (yieldInfo == null || yieldInfo == scope) {
            this.parent = null;
            this.yieldInfo = null;
            // 這個位置是死循環的唯一跳出點
            return;
        } else {
            // 執行到這個位置說明在當前是子Continuation並且進行了yield操作,需要跳轉到父Continuation進行yield操作
            parent.child = this;
            parent.yield0((ContinuationScope)yieldInfo, this);
            parent.child = null;
        }
    }
}

// Continuation.enter()系列方法

// 這是一個native方法,它最終會根據判斷回調到enter()方法
private native static void enterSpecial(Continuation c, boolean isContinue, boolean isVirtualThread);

// Continuation的入口方法,用戶任務回調的入口
@DontInline
@IntrinsicCandidate
private static void enter(Continuation c, boolean isContinue) {
    // This method runs in the "entry frame".
    // A yield jumps to this method's caller as if returning from this method.
    try {
        c.enter0();
    } finally {
        c.finish();
    }
}

// 真正任務包裝器執行的回調方法
private void enter0() {
    target.run();
}

// Continuation完成,標記done為true
private void finish() {
    done = true;
    assert isEmpty();
}


// Continuation.yield()方法,靜態方法
public static boolean yield(ContinuationScope scope) {
    // 獲取當前運載線程的Continuation實例
    Continuation cont = JLA.getContinuation(currentCarrierThread());
    Continuation c;
    // 基於Continuation實例當前向父節點遍歷,直到匹配虛擬線程類型的ContinuationScope的Continuation,如果沒有匹配的Continuation會拋出異常中斷流程
    for (c = cont; c != null && c.scope != scope; c = c.parent)
        ;
    if (c == null)
        throw new IllegalStateException("Not in scope " + scope);
    // 把當前的Continuation掛起到給定的ContinuationScope
    return cont.yield0(scope, null);
}

// 透過上下文猜測是當前的Continuation實例掛起到給定的ContinuationScope
private boolean yield0(ContinuationScope scope, Continuation child) {
    // 強制搶佔式卸載標記為false
    preempted = false;
    // 如果當前Continuation實例的yieldInfo不等於傳入的ContinuationScope實例,則進行更新,相等的情況下yieldInfo會保持是一個空值
    if (scope != this.scope)
        this.yieldInfo = scope;
    // 最終的yield調用,最終當前Continuation就是阻塞在此方法,從下文源碼猜測,當該方法喚醒後,res值為0的時候,當前Continuation實例會繼續執行,返回其他值的時候則會打印pined線程棧
    int res = doYield();
    // 放置內存屏障防止指令重排,後面注釋提到是防止編譯器進行某些轉換
    U.storeFence(); // needed to prevent certain transformations by the compiler

    assert scope != this.scope || yieldInfo == null : "scope: " + scope + " this.scope: " + this.scope + " yieldInfo: " + yieldInfo + " res: " + res;
    assert yieldInfo == null || scope == this.scope || yieldInfo instanceof Integer : "scope: " + scope + " this.scope: " + this.scope + " yieldInfo: " + yieldInfo + " res: " + res;

    if (child != null) { // TODO: ugly <----- 這個位置還有一句吐槽的代碼注釋:醜陋的代碼
        if (res != 0) {
            child.yieldInfo = res;
        } else if (yieldInfo != null) {
            assert yieldInfo instanceof Integer;
            child.yieldInfo = yieldInfo;
        } else {
            child.yieldInfo = res;
        }
        this.yieldInfo = null;
    } else {
        if (res == 0 && yieldInfo != null) {
            res = (Integer)yieldInfo;
        }
        this.yieldInfo = null;

        if (res == 0)
            // Continuation實例繼續執行前回調
            onContinue();
        else
            // Continuation固定在運載線程前回調,res是pined的級別
            onPinned0(res);
    }
    assert yieldInfo == null;
    // 返回布爾值結果表示當前Continuation實例是否會繼續執行
    return res == 0;
}

// 最終的yield調用,看實現是拋出異常,猜測是由JVM實現
@IntrinsicCandidate
private static int doYield() { throw new Error("Intrinsic not installed"); }

說實話,Continuation源碼的可讀性比想像中低,連代碼注釋也留下了”醜陋的”這句吐槽。通過上面源碼分析和上一節Continuation的一個例子,可以得知Continuation#yield()可以讓程序代碼中斷,然後再次調用Continuation#run()可以從上一個中斷位置繼續執行,JVM在這個過程中為使用者屏蔽了Continuation和運行此Continuation的平台線程之間的交互細節,讓使用者可以專註實際的任務開發即可。

VirtualThread

前面花了不少篇幅介紹Continuation,它是一個全新的API。已有的JUC類庫已經十分完善,如果可以把Continuation融入到已有的JUC體系,那麼就可以通過線程池技術去管理運載線程,原有的大多數並發相關API也能直接在協程體系中使用。從這個背景來看,創造一個Thread類的全新子類用於融合JUCContinuation是十分合適的,這樣通過很小的改造成本就能通過Java繼承特性把這個全新子類適配JUC體系,也能擴展一些API讓它適配協程新引入的特性,這個全新的子類就是java.lang.VirtualThread

vt-source-code-12

VirtualThread類的繼承體系如下:

package java.lang;

final class VirtualThread extends BaseVirtualThread {
  // ...
}

package java.lang;

sealed abstract class BaseVirtualThread extends Thread
        permits VirtualThread, ThreadBuilders.BoundVirtualThread {
  // ... 
}

VirtualThreadBaseVirtualThread的子類,而BaseVirtualThread是一個”密封類”,它是Thread的子類,只對VirtualThreadThreadBuilders.BoundVirtualThread開放,並且VirtualThread包私有訪問權限的同時用final關鍵字修飾,無法被繼承。接着看VirtualThread的成員變量和構造函數:

// java.lang.VirtualThread

// Unsafe實例
private static final Unsafe U = Unsafe.getUnsafe();

// 虛擬線程的ContinuationScope靜態常量
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");

// 調度器,或者說執行器,默認就是用此調度器運行虛擬線程
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();

// 調度線程池實例,用於喚醒帶超時阻塞的虛擬線程實例,主要用於sleep的喚醒
private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();

// pin模式,也就是pined thread的跟蹤模式,決定打印堆棧的詳細程度,來自於系統參數jdk.tracePinnedThreads,full表示詳細,short表示簡略
private static final int TRACE_PINNING_MODE = tracePinningMode();

// 下面幾個都是成員地址,用於Unsafe直接操作成員
private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread");
private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination");

// 調度器實例
private final Executor scheduler;

// Continuation實例
private final Continuation cont;

// Continuation實例的Runnable包裝實例
private final Runnable runContinuation;

// 虛擬線程狀態,這個值由JVM訪問和修改
private volatile int state;

// 下面的狀態集合
private static final int NEW      = 0;
private static final int STARTED  = 1;
private static final int RUNNABLE = 2;     // runnable-unmounted
private static final int RUNNING  = 3;     // runnable-mounted
private static final int PARKING  = 4;
private static final int PARKED   = 5;     // unmounted
private static final int PINNED   = 6;     // mounted
private static final int YIELDING = 7;     // Thread.yield
private static final int TERMINATED = 99;  // final state

// 虛擬線程unmount後可以從調度過程中掛起的狀態
private static final int SUSPENDED = 1 << 8;
private static final int RUNNABLE_SUSPENDED = (RUNNABLE | SUSPENDED);
private static final int PARKED_SUSPENDED   = (PARKED | SUSPENDED);

// park操作許可
private volatile boolean parkPermit;

// 運載線程實例
private volatile Thread carrierThread;

// 終結倒數柵欄實例,主要用於join操作
private volatile CountDownLatch termination;

// 唯一構造函數
VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) {
    // 默認標記bound為false,當bound為true的時候標記為綁定到系統線程
    super(name, characteristics, /*bound*/ false);
    Objects.requireNonNull(task);
    // 如果傳入的調度器實例非空則直接使用
    // 否則,如果父線程是虛擬線程,則使用父虛擬線程的調度器實例
    // 如果傳入的調度器實例為空,父線程為平台線程,那麼使用默認的調度器
    // choose scheduler if not specified
    if (scheduler == null) {
        Thread parent = Thread.currentThread();
        if (parent instanceof VirtualThread vparent) {
            scheduler = vparent.scheduler;
        } else {
            scheduler = DEFAULT_SCHEDULER;
        }
    }
    // 賦值調度器
    this.scheduler = scheduler;
    // 封裝和初始化Continuation
    this.cont = new VThreadContinuation(this, task);
    // 初始化Continuation的Runnable包裝器,最終提交到調度器中執行
    this.runContinuation = this::runContinuation;
}

// 虛擬線程Continuation的專有子類,默認為ContinuationScope("VirtualThreads"),從而實現Continuation.enter()執行時候實際上執行的是VirtualThread.run()方法
// 也就是 Runnable.run()[runContinuation by carrier thread from executor] --> Continuation.run() --> Continuation.enter() --> VirtualThread.run() --> Runnable.run()[user task]
private static class VThreadContinuation extends Continuation {

    VThreadContinuation(VirtualThread vthread, Runnable task) {
        super(VTHREAD_SCOPE, () -> vthread.run(task));
    }

    // pin之前回調的方法,基於TRACE_PINNING_MODE的返回值決定pinned線程棧的打印詳略
    @Override
    protected void onPinned(Continuation.Pinned reason) {
        if (TRACE_PINNING_MODE > 0) {
            boolean printAll = (TRACE_PINNING_MODE == 1);
            PinnedThreadPrinter.printStackTrace(System.out, printAll);
        }
    }
}

// 在當前線程上運行或繼續Continuation的執行,必須由平台線程運行此方法,最終會封裝為Runnble包裝器提交到執行器中運行
private void runContinuation() {
    // the carrier must be a platform thread
    if (Thread.currentThread().isVirtual()) {
        throw new WrongThreadException();
    }

    // set state to RUNNING
    boolean firstRun;
    int initialState = state();
    // 當前為STARTED狀態並且CAS更新為RUNNING狀態則標記首次運行為true
    if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) {
        // first run
        firstRun = true;
    } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) {
        // 當前為RUNNABLE狀態並且CAS更新為RUNNING狀態則標記首次運行為false,並且設置park許可為false
        // consume parking permit
        setParkPermit(false);
        firstRun = false;
    } else {
        // not runnable
        return;
    }

    // notify JVMTI before mount
    if (notifyJvmtiEvents) notifyJvmtiMountBegin(firstRun);

    try {
        // 執行Continuation.run()
        cont.run();
    } finally {
        // Continuation執行完成,回調鉤子方法afterTerminate
        if (cont.isDone()) {
            afterTerminate(/*executed*/ true);
        } else {
            // Continuation沒有執行完成,說明調用了Continuation.yield或者pin到運載線程中進行了park操作
            afterYield();
        }
    }
}

// Continuation執行完成回調的鉤子方法
private void afterTerminate(boolean executed) {
    assert (state() == TERMINATED) && (carrierThread == null);

    if (executed) {
        if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(true);
    }

    // 如果有其他線程阻塞等待虛擬線程的返回,例如調用了join方法,那麼在這裡解除阻塞
    CountDownLatch termination = this.termination;
    if (termination != null) {
        assert termination.getCount() == 1;
        termination.countDown();
    }
    
    // 如果執行成功則通知線程容器當前線程實例退出,清空線程本地變量引用
    if (executed) {
        // notify container if thread executed
        threadContainer().onExit(this);

        // clear references to thread locals
        clearReferences();
    }
}

// 由於Continuation的yield操作或者調用了Thread.yield()導致Continuation掛起,需要重新把Continuation的包裝器"懶提交"到調度器中
private void afterYield() {
    int s = state();
    assert (s == PARKING || s == YIELDING) && (carrierThread == null);
    // 如果是PARKING狀態,這種對應於Continuation的yield操作調用
    if (s == PARKING) {
        // 更變為PARKED狀態
        setState(PARKED);

        // notify JVMTI that unmount has completed, thread is parked
        if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false);

        // 得到park許可,並且CAS為RUNNABLE狀態
        if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) {
            // 進行懶提交,如果可能的話,用當前線程作為運載線程繼續執行任務
            lazySubmitRunContinuation();
        }
    } else if (s == YIELDING) {   // 如果是YIELDING狀態,這種對應於調用了Thread.yield
        // 更變為RUNNABLE狀態
        setState(RUNNABLE);

        // notify JVMTI that unmount has completed, thread is runnable
        if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false);

        // 進行懶提交,如果可能的話,用當前線程作為運載線程繼續執行任
        lazySubmitRunContinuation();
    }
}

這裡唯一的構造函數是比較複雜的,拋開一些鉤子接口,最終想達到的效果就是:

Runnable.run()[runContinuation by carrier thread from executor] --> Continuation.run() --> Continuation.enter() --> VirtualThread.run() --> Runnable.run()[user task]

用戶任務實際被包裹了很多層,在最裏面一層才會回調。VirtualThread中提供了兩個靜態全局的線程池實例,一個用於調度,一個用於喚醒,這裡看看兩個線程池是如何構造的:

// java.lang.VirtualThread

private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();

// 創建默認的調度器
private static ForkJoinPool createDefaultScheduler() {
    // 線程工廠,默認創建CarrierThread實例,CarrierThread是ForkJoinWorkerThread的一個子類
    ForkJoinWorkerThreadFactory factory = pool -> {
        PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread(pool);
        return AccessController.doPrivileged(pa);
    };
    PrivilegedAction<ForkJoinPool> pa = () -> {
        int parallelism, maxPoolSize, minRunnable;
        String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism");
        String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize");
        String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable");
        if (parallelismValue != null) {
            parallelism = Integer.parseInt(parallelismValue);
        } else {
            parallelism = Runtime.getRuntime().availableProcessors();
        }
        if (maxPoolSizeValue != null) {
            maxPoolSize = Integer.parseInt(maxPoolSizeValue);
            parallelism = Integer.min(parallelism, maxPoolSize);
        } else {
            maxPoolSize = Integer.max(parallelism, 256);
        }
        if (minRunnableValue != null) {
            minRunnable = Integer.parseInt(minRunnableValue);
        } else {
            minRunnable = Integer.max(parallelism / 2, 1);
        }
        Thread.UncaughtExceptionHandler handler = (t, e) -> { };
        boolean asyncMode = true; // FIFO
        return new ForkJoinPool(parallelism, factory, handler, asyncMode,
                        0, maxPoolSize, minRunnable, pool -> true, 30, SECONDS);
    };
    return AccessController.doPrivileged(pa);
}

// 創建調度線程池,用於虛擬線程帶超時時間的unpark操作
private static ScheduledExecutorService createDelayedTaskScheduler() {
    String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize");
    int poolSize;
    if (propValue != null) {
        poolSize = Integer.parseInt(propValue);
    } else {
        // 確保至少有一個工作線程
        poolSize = 1;
    }
    ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
        Executors.newScheduledThreadPool(poolSize, task -> {
            return InnocuousThread.newThread("VirtualThread-unparker", task);
        });
    // 任務取消後馬上從工作隊列移除
    stpe.setRemoveOnCancelPolicy(true);
    return stpe;
}

對於默認調度器(DEFAULT_SCHEDULER)的創建,它是一個ForkJoinPool實例,構造參數的選取如下:

  • parallelism參數由系統變量jdk.virtualThreadScheduler.parallelism決定,默認值為Runtime.getRuntime().availableProcessors(),如果配置了系統參數jdk.virtualThreadScheduler.maxPoolSize則取min(parallelism,maxPoolSize)
  • maxPoolSize參數由系統變量jdk.virtualThreadScheduler.maxPoolSize決定,默認值為min(parallelism, maxPoolSize)
  • minRunnable參數由系統變量jdk.virtualThreadScheduler.minRunnable決定,默認值為max(parallelism / 2, 1)
  • asyncMode參數固定值true,也就是選用FIFO模式
  • keepAliveTime參數為固定值30
  • saturate參數在JDK17引入,是一個Predicate函數,在此固定返回true,用於忽略minRunnable值允許線程池飽和
  • 線程工廠用於創建CarrierThread實例,CarrierThreadForkJoinWorkerThread的子類

Intel 4C8T開發機器環境中,該ForkJoinPool實例創建時候的幾個參數分別為:parallelism = 8, maxPoolSize = 256, minRunnable = 4

對於調度線程池(UNPARKER)的創建,它是一個ScheduledThreadPoolExecutor實例,構造參數的選取如下:

  • corePoolSize參數由系統變量jdk.unparker.maxPoolSize決定,並且確保最小值為1
  • 線程工廠用於創建InnocuousThread實例,線程名稱為VirtualThread-unparker

接着看虛擬線程的啟動方法start()

// java.lang.VirtualThread

@Override
public void start() {
    start(ThreadContainers.root());
}

// 調度虛擬線程讓之運行
@Override
void start(ThreadContainer container) {
    // CAS由NEW轉換為STARTED狀態
    if (!compareAndSetState(NEW, STARTED)) {
        throw new IllegalThreadStateException("Already started");
    }
 
    // 綁定當前虛擬線程到線程容器
    setThreadContainer(container);

    // 標記為未啟動
    boolean started = false;
    // 回調start鉤子方法
    container.onStart(this); // may throw
    try {
        // 從給定容器繼承extent-local綁定參數
        inheritExtentLocalBindings(container);
        // 提交'runContinuation'任務到調度器
        submitRunContinuation();
        // 標記為啟動完成
        started = true;
    } finally {
        // 如果啟動失敗,則標記最終狀態和回調終結鉤子方法
        if (!started) {
            setState(TERMINATED);
            container.onExit(this);
            afterTerminate(/*executed*/ false);
        }
    }
}

// 提交'runContinuation'任務到調度器
private void submitRunContinuation() {
    submitRunContinuation(false);
}

// 提交'runContinuation'任務到調度器,lazySubmit參數決定是否"懶提交"
private void submitRunContinuation(boolean lazySubmit) {
    try {
        if (lazySubmit && scheduler instanceof ForkJoinPool pool) {
            // ForkJoinPool類型調度器並且lazySubmit為true,對runContinuation這個Runnable實例適配為ForkJoinTask類型,進行"懶提交"到ForkJoinPool
            pool.lazySubmit(ForkJoinTask.adapt(runContinuation));
        } else {
            // 非ForkJoinPool類型調度器或者lazySubmit為false,直接使用Executor.execute()提交任務
            scheduler.execute(runContinuation);
        }
    } catch (RejectedExecutionException ree) {
        // 線程池拒絕接收任務,發佈提交失敗事件到JVM
        var event = new VirtualThreadSubmitFailedEvent();
        if (event.isEnabled()) {
            event.javaThreadId = threadId();
            event.exceptionMessage = ree.getMessage();
            event.commit();
        }
        throw ree;
    }
}

ForkJoinPool#lazySubmit()JDK19新增的一個API,它的方法注釋如下:

提交給定的任務,但不保證它最終會在沒有可用活動線程的情況下執行。在某些上下文中,這種方法可以通過依賴於特定於上下文的知識來減少競爭和開銷,即現有線程(如果在此池中操作,則可能包括調用線程)最終將可用來執行任務

使用此方法提交的目的就是希望可以用當前調用線程去執行任務,對於首次提交Continuation任務可能作用不明顯,但是對於Continuation.yield()調用後的再次提交意義比較重大,因為這樣就可以把運行的Continuation.run()方法鏈分配到同一個運載線程實例,在開發者的角度就是虛擬線程任務執行中斷後恢復執行,執行任務的運載線程沒有改變。

源碼中還可以發現,run()方法覆蓋了Thread#run()替換為空實現,因為VirtualThread最終是觸發Continuation#run(),這一點已經在start()方法進行提交和調度。最後分析虛擬線程的阻塞(不帶超時,也就是timeout = 0)、限時阻塞(timeout > 0)、join的實現。先看相對簡單的joinNanos()

// java.lang.VirtualThread
// Thread.join() --> VirtualThread.joinNanos()

// 虛擬線程join調用
boolean joinNanos(long nanos) throws InterruptedException {
    // 如果狀態為TERMINATED直接返回true
    if (state() == TERMINATED)
        return true;
    // 獲取數柵欄實例
    CountDownLatch termination = getTermination();
    // 再次驗證如果狀態為TERMINATED直接返回true
    if (state() == TERMINATED)
        return true;

    // 如果nanos為0則調用CountDownLatch.await()阻塞
    if (nanos == 0) {
        termination.await();
    } else {
        // 如果nanos大於0則調用CountDownLatch.await(nanos,TimeUnit)限時阻塞
        boolean terminated = termination.await(nanos, NANOSECONDS);
        if (!terminated) {
            // 阻塞到超時時限過了返回,非解除阻塞下的正常返回
            return false;
        }
    }
    assert state() == TERMINATED;
    // 解除阻塞下的正常返回
    return true;
}

// 懶創建終結倒數柵欄實例,設置資源值為1,這裡用到CAS是考慮之前已經創建和保存到成員變量,如果已創建則直接選用成員變量的那個實例
private CountDownLatch getTermination() {
    CountDownLatch termination = this.termination;
    if (termination == null) {
        termination = new CountDownLatch(1);
        if (!U.compareAndSetReference(this, TERMINATION, null, termination)) {
            termination = this.termination;
        }
    }
    return termination;
}

接着看虛擬線程阻塞和限時阻塞的現實:

// java.lang.VirtualThread
// Thread.sleep() --> VirtualThread.sleepNanos()

// 給定休眠時間讓當前虛擬線程休眠
void sleepNanos(long nanos) throws InterruptedException {
    assert Thread.currentThread() == this;
    // nanos必須大於等於0
    if (nanos >= 0) {
        // 如果支持線程休眠事件發佈則在休眠處理前後處理休眠事件,最終的休眠操作調用doSleepNanos()完成
        if (ThreadSleepEvent.isTurnedOn()) {
            ThreadSleepEvent event = new ThreadSleepEvent();
            try {
                event.time = nanos;
                event.begin();
                doSleepNanos(nanos);
            } finally {
                event.commit();
            }
        } else {
            doSleepNanos(nanos);
        }
    }
}

// 讓當前線程休眠給定的睡眠時間(單位為納秒)。如果nanos為0時,線程將嘗試yield
private void doSleepNanos(long nanos) throws InterruptedException {
    assert nanos >= 0;
    // 響應中斷清理中斷狀態,拋出中斷異常
    if (getAndClearInterrupt())
        throw new InterruptedException();
    if (nanos == 0) {
        // nanos為0的時候直接進行yield操作,具體是Continuation.yield()
        tryYield();
    } else {
        // park for the sleep time
        try {
            long remainingNanos = nanos;
            // 臨時變量記錄開始休眠時間
            long startNanos = System.nanoTime();
            while (remainingNanos > 0) {
                // 剩餘休眠時間大於0納秒,進行park操作
                parkNanos(remainingNanos);
                // 響應中斷清理中斷狀態,拋出中斷異常
                if (getAndClearInterrupt()) {
                    throw new InterruptedException();
                }
                // 重新計算剩餘休眠事件
                remainingNanos = nanos - (System.nanoTime() - startNanos);
            }
        } finally {
            // park會消耗park許可,走到這裡說明unpark了,可以重新設置許可
            setParkPermit(true);
        }
    }
}

// 當前虛擬線程park(阻塞)直至指定等候時間,進行unpark操作或者中斷也能解除park狀態
@Override
void parkNanos(long nanos) {
    assert Thread.currentThread() == this;

    // 已經消耗了park許可或者處於中斷狀態,直接返回
    if (getAndSetParkPermit(false) || interrupted)
        return;

    // 當前虛擬線程park(阻塞)直至指定等候時間
    if (nanos > 0) {
        // 記錄開始park的時間
        long startTime = System.nanoTime();
        // 記錄是否yield成功
        boolean yielded;
        // 通過調度線程池提交一個延時執行的unpark任務,用於進行unpark操作解除當前虛擬線程阻塞等待
        Future<?> unparker = scheduleUnpark(nanos);
        // 設置為PARKING狀態
        setState(PARKING);
        try {
            // 執行Continuation.yield()
            yielded = yieldContinuation();
        } finally {
            assert (Thread.currentThread() == this)
                    && (state() == RUNNING || state() == PARKING);
            // 執行Continuation.yield()執行完畢後,如果該unparker任務未完成則進行取消操作
            cancel(unparker);
        }

        // Continuation.yield()調用失敗,則重新計算等待時間並基於運載線程進行park操作
        if (!yielded) {
            long deadline = startTime + nanos;
            if (deadline < 0L)
                deadline = Long.MAX_VALUE;
            parkOnCarrierThread(true, deadline - System.nanoTime());
        }
    }
}

// 當前虛擬線程的運載線程park(阻塞)直至指定等候時間,這就是前面提到過的pinned thread產生的過程
private void parkOnCarrierThread(boolean timed, long nanos) {
    assert state() == PARKING;

    var pinnedEvent = new VirtualThreadPinnedEvent();
    pinnedEvent.begin();
    // 設置狀態為PINNED
    setState(PINNED);
    try {
        // 如果沒有park許可,則不處理,否則使用Usafe的park api進行平台線程阻塞
        if (!parkPermit) {
            if (!timed) {
                U.park(false, 0);
            } else if (nanos > 0) {
                U.park(false, nanos);
            }
        }
    } finally {
        // 阻塞解除後狀態為RUNNING
        setState(RUNNING);
    }

    // 解除阻塞後此park操作消耗了park許可
    setParkPermit(false);

    pinnedEvent.commit();
}

@ChangesCurrentThread
private Future<?> scheduleUnpark(long nanos) {
    Thread carrier = this.carrierThread;
    // need to switch to current platform thread to avoid nested parking
    carrier.setCurrentThread(carrier);
    try {
        return UNPARKER.schedule(() -> unpark(), nanos, NANOSECONDS);
    } finally {
        carrier.setCurrentThread(this);
    }
}

// 如果unpark任務未完成則取消它,這個過程需要切換到當前平台線程以避免嵌套park操作
@ChangesCurrentThread
private void cancel(Future<?> future) {
    if (!future.isDone()) {
        Thread carrier = this.carrierThread;
        // need to switch to current platform thread to avoid nested parking
        carrier.setCurrentThread(carrier);
        try {
            future.cancel(false);
        } finally {
            carrier.setCurrentThread(this);
        }
    }
}

// unpark操作,重新啟用當前虛擬線程進行調度,如果虛擬線程處於park狀態會將它解除阻塞
@Override
@ChangesCurrentThread
void unpark() {
    Thread currentThread = Thread.currentThread();
    // 重置park許可false -> true,並且判斷當前線程是虛擬線程
    if (!getAndSetParkPermit(true) && currentThread != this) {
        int s = state();
        // 命中虛擬線程PARKED狀態,則CAS設置為RUNNABLE狀態,並且重新提交Continuation的Runnable包裝器到調度器中,這個提交過程需要切換到當前運載線程,然後恢復為當前虛擬線程
        if (s == PARKED && compareAndSetState(PARKED, RUNNABLE)) {
            if (currentThread instanceof VirtualThread vthread) {
                Thread carrier = vthread.carrierThread;
                carrier.setCurrentThread(carrier);
                try {
                    submitRunContinuation();
                } finally {
                    carrier.setCurrentThread(vthread);
                }
            } else {
                submitRunContinuation();
            }
        } else if (s == PINNED) {
            // park操作基於運載線程阻塞,則調用Usafe的unpark api進行喚醒,喚醒後在parkOnCarrierThread()中會重新被修改為RUNNING狀態
            synchronized (carrierThreadAccessLock()) {
                Thread carrier = carrierThread;
                if (carrier != null && state() == PINNED) {
                    U.unpark(carrier);
                }
            }
        }
    }
}

// 嘗試執行Continuation.yield()
void tryYield() {
    assert Thread.currentThread() == this;
    // 設置狀態為YIELDING
    setState(YIELDING);
    try {
        // 執行Continuation.yield(),忽略返回值處理
        yieldContinuation();
    } finally {
        assert Thread.currentThread() == this;
        // 虛擬線程重新mount並且運行,設置為RUNNING狀態
        if (state() != RUNNING) {
            assert state() == YIELDING;
            setState(RUNNING);
        }
    }
}

// 執行Continuation.yield()
private boolean yieldContinuation() {
    boolean notifyJvmti = notifyJvmtiEvents;
    // 當前虛擬線程進行unmount操作
    if (notifyJvmti) notifyJvmtiUnmountBegin(false);
    unmount();
    try {
        // 執行Continuation.yield()
        return Continuation.yield(VTHREAD_SCOPE);
    } finally {
        // 當前虛擬線程重新進行mount操作
        mount();
        if (notifyJvmti) notifyJvmtiMountEnd(false);
    }
}

總的來說就是:

  • 阻塞:通過Continuation.yield()調用實現阻塞,主要是提供給Thread.sleep()調用
  • 限時阻塞:Continuation.yield()調用之前計算喚醒時間並且向調度線程池(UNPARKER)提交一個延時執行unpark任務通過”懶提交”方式重新運行Continuation.run()調用鏈解除阻塞,主要是提供給Thread.sleep(long nanos)調用
  • join(Nanos):通過CountDownLatch.await()調用實現阻塞,在虛擬線程終結鉤子方法afterTerminate()中調用CountDownLatch.countDown()解除阻塞,join(Nanos)()方法主要是提供給Thread.join()調用
  • 特殊情況:如果Continuation.yield()調用失敗,則會通過Unsafe提供的park API阻塞在運載線程上,在unpark任務中通過Unsafe提供的unpark API解除阻塞

分析完虛擬線程實現的核心代碼,這裡總結一下虛擬線程的狀態切換,由於支持的狀態比較多,這裡通過一張狀態圖進行展示:

vt-source-code-13

還有其他像獲取虛擬線程棧、JVM狀態通知、獲取虛擬線程狀態、狀態切換的CAS操作等方法限於篇幅這裡就不展開分析。

線程建造器

線程建造器和線程工廠建造器用於快速創建平台線程實例、平台線程工廠實例、虛擬線程實例或者虛擬線程工廠實例。熟悉Builder模式的開發者看這個新引入的功能源碼應該比較輕鬆:

// 內部類:java.lang.Thread.Builder
// Builder只對OfPlatform、OfVirtual、BaseThreadBuilder開放繼承權限
@PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS)
public sealed interface Builder
        permits Builder.OfPlatform,
                Builder.OfVirtual,
                ThreadBuilders.BaseThreadBuilder {

    // 設置線程名稱
    Builder name(String name);
    
    // 設置線程名稱規則,最終線程名稱為:$prefix$start++
    // 如prefix: worker-, start: 0,則worker-0, worker-1.... worker-n
    Builder name(String prefix, long start);

    Builder allowSetThreadLocals(boolean allow);

    // 是否開啟InheritableThreadLocal
    Builder inheritInheritableThreadLocals(boolean inherit);

    // 設置未捕獲異常處理器
    Builder uncaughtExceptionHandler(UncaughtExceptionHandler ueh);

    // 設置非啟動前的任務實例
    Thread unstarted(Runnable task);

    // 設置任務實例並且馬上啟動
    Thread start(Runnable task);

    // 構建線程工廠實例
    ThreadFactory factory();

    // 平台線程Builder接口
    @PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS)
    sealed interface OfPlatform extends Builder
            permits ThreadBuilders.PlatformThreadBuilder {

        @Override OfPlatform name(String name);
        @Override OfPlatform name(String prefix, long start);
        @Override OfPlatform allowSetThreadLocals(boolean allow);
        @Override OfPlatform inheritInheritableThreadLocals(boolean inherit);
        @Override OfPlatform uncaughtExceptionHandler(UncaughtExceptionHandler ueh);

        // 設置平台線程組
        OfPlatform group(ThreadGroup group);

        // 設置新建平台線程是否為守護線程
        OfPlatform daemon(boolean on);

        // 判斷新建平台線程是否為守護線程
        default OfPlatform daemon() {
            return daemon(true);
        }

        // 設置優先級
        OfPlatform priority(int priority);

        // 設置線程棧大小
        OfPlatform stackSize(long stackSize);
    }

    // 虛擬線程Builder接口
    @PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS)
    sealed interface OfVirtual extends Builder
            permits ThreadBuilders.VirtualThreadBuilder {

        @Override OfVirtual name(String name);
        @Override OfVirtual name(String prefix, long start);
        @Override OfVirtual allowSetThreadLocals(boolean allow);
        @Override OfVirtual inheritInheritableThreadLocals(boolean inherit);
        @Override OfVirtual uncaughtExceptionHandler(UncaughtExceptionHandler ueh);
    }
}

上面的Builder接口都在java.lang.ThreadBuilders中進行實現,因為整體實現比較簡單,這裡只看全新引入的VirtualThreadFactoryVirtualThreadBuilder

// 內部類:java.lang.ThreadBuilders.VirtualThreadFactory
private static class VirtualThreadFactory extends BaseThreadFactory {

    // 執行器或者說調度器實例
    private final Executor scheduler;
    
    // 線程工廠構造函數基本與平台線程工廠實現一致,但是必須提供執行器實例
    VirtualThreadFactory(Executor scheduler,
                         String name,
                         long start,
                         int characteristics,
                         UncaughtExceptionHandler uhe) {
        super(name, start, characteristics, uhe);
        this.scheduler = scheduler;
    }

    @Override
    public Thread newThread(Runnable task) {
        Objects.requireNonNull(task);
        // 獲取下一個虛擬線程名稱,start >= 0則為$name$start++,否則固定為name
        String name = nextThreadName();
        // 創建新的虛擬線程實例
        Thread thread = newVirtualThread(scheduler, name, characteristics(), task);
        UncaughtExceptionHandler uhe = uncaughtExceptionHandler();
        if (uhe != null)
            // 設置未捕獲異常處理器
            thread.uncaughtExceptionHandler(uhe);
        return thread;
    }
}

// 靜態方法:java.lang.ThreadBuilders#newVirtualThread()
static Thread newVirtualThread(Executor scheduler,
                               String name,
                               int characteristics,
                               Runnable task) {
    // 當前JVM支持Continuation,則創建初始化一個新的虛擬線程實例
    if (ContinuationSupport.isSupported()) {
        return new VirtualThread(scheduler, name, characteristics, task);
    } else {
        // 當前的JVM不支持Continuation,則虛擬線程退化為一個平台線程的包裝類,要求執行器必須為空
        if (scheduler != null)
            throw new UnsupportedOperationException();
        return new BoundVirtualThread(name, characteristics, task);
    }
}

// 內部類:java.lang.ThreadBuilders.VirtualThreadBuilder
static final class VirtualThreadBuilder
        extends BaseThreadBuilder<OfVirtual> implements OfVirtual {

    // 執行器成員變量
    private Executor scheduler;

    VirtualThreadBuilder() {
    }
    
    // 目前VirtualThreadBuilder的構造都是默認修飾符,Executor只能在單元測試中調用
    // 也就是用戶無法設置Executor,因為所有虛擬線程默認都是由全局的ForkJoinPool調度
    // invoked by tests
    VirtualThreadBuilder(Executor scheduler) {
        if (!ContinuationSupport.isSupported())
            throw new UnsupportedOperationException();
        this.scheduler = Objects.requireNonNull(scheduler);
    }
    
    // 創建虛擬線程實例,設置任務,處於非啟動狀態
    @Override
    public Thread unstarted(Runnable task) {
        Objects.requireNonNull(task);
        var thread = newVirtualThread(scheduler, nextThreadName(), characteristics(), task);
        UncaughtExceptionHandler uhe = uncaughtExceptionHandler();
        if (uhe != null)
            thread.uncaughtExceptionHandler(uhe);
        return thread;
    }

    // 創建虛擬線程實例,設置任務並且馬上啟動
    @Override
    public Thread start(Runnable task) {
        Thread thread = unstarted(task);
        thread.start();
        return thread;
    }
    
    // 初始化虛擬線程工廠實例
    @Override
    public ThreadFactory factory() {
        return new VirtualThreadFactory(scheduler, name(), counter(), characteristics(),
                uncaughtExceptionHandler());
    }
}

值得注意的是:虛擬線程實現上來看都是”守護線程”,也就是說虛擬線程不需要設置daemon參數。平台線程或者虛擬線程的建造器或者工廠實現都是包訪問權限的內部類,其父類使用了permits關鍵字指定繼承範圍,目前是只能通過鏈式設置值的方式初始化,無法修改其中的成員或者方法。

其他探討

其他探討主要包括:

  • 自定義執行器
  • 內存佔用評估
  • 局限性
  • 適用場景
  • JUC親和性

自定義執行器

雖然虛擬線程建造器屏蔽了執行器Executor實例的公共訪問權限,在目前預留功能版本下只能所有虛擬線程的任務最終都是由全局的ForkJoinPool執行,可以通過VarHandle對其進行強制值設置,這樣就能修改虛擬線程底層的載體線程為我們自定義線程池中的平台線程,例如這樣:

public class VirtualThreadCustomExecutor {

    /**
     * virtual thread with custom executor
     * add VM options: --add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.lang.reflect=ALL-UNNAMED
     */
    public static void main(String[] args) throws Exception {
        ExecutorService carrier = Executors.newSingleThreadExecutor(runnable -> {
            Thread thread = new Thread(runnable);
            thread.setDaemon(true);
            thread.setName("CustomVirtualCarrier");
            return thread;
        });
        Thread.Builder.OfVirtual ofVirtual = Thread.ofVirtual();
        Class<?> klass = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder");
        VarHandle varHandle = MethodHandles.privateLookupIn(klass, MethodHandles.lookup()).findVarHandle(klass, "scheduler", Executor.class);
        varHandle.set(ofVirtual, carrier);
        ThreadFactory factory = ofVirtual.name("VirtualWorker-", 0).allowSetThreadLocals(false).factory();
        ExecutorService virtualWorkerPool = Executors.newThreadPerTaskExecutor(factory);
        virtualWorkerPool.execute(() -> {
            Thread thread = Thread.currentThread();
            System.out.printf("first task ==> 線程名稱:%s,載體線程名稱:%s,是否虛擬線程:%s\n", thread.getName(), getCurrentCarrierThreadName(thread), thread.isVirtual());
        });
        virtualWorkerPool.execute(() -> {
            Thread thread = Thread.currentThread();
            System.out.printf("second task ==> 線程名稱:%s,載體線程名稱:%s,是否虛擬線程:%s\n", thread.getName(), getCurrentCarrierThreadName(thread), thread.isVirtual());
        });
        Thread.sleep(Long.MAX_VALUE);
    }

    private static String getCurrentCarrierThreadName(Thread currentThread) {
        if (currentThread.isVirtual()) {
            try {
                MethodHandle methodHandle = MethodHandles.privateLookupIn(Thread.class, MethodHandles.lookup())
                        .findStatic(Thread.class, "currentCarrierThread", MethodType.methodType(Thread.class));
                Thread carrierThread = (Thread) methodHandle.invoke();
                return carrierThread.getName();
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
        return "UNKNOWN";
    }
}

// 運行結果
first task ==> 線程名稱:VirtualWorker-0,載體線程名稱:CustomVirtualCarrier,是否虛擬線程:true
second task ==> 線程名稱:VirtualWorker-1,載體線程名稱:CustomVirtualCarrier,是否虛擬線程:true

可以看到最終效果,虛擬線程中的任務最終在自定義線程池中的唯一平台線程中運行。這裡只是做一個實驗性例子,使用反射或者MethodHandle對未穩定的API進行操作以後有很大概率會出現兼容性問題,不建議在生產環境這樣操作,待虛擬線程完成預覽正式發佈後應該會提供對應的API讓開發者設置自定義執行器。

資源佔用評估

平台線程(單個實例)的資源佔用:

  • 通常是預留1 mb線程棧空間,額外需要16 kb操作系統核心數據源結構
  • 對於已經啟動的平台線程實例,會佔據2000+ byte數據,包括VM中平台線程的元數據等

虛擬線程(單個實例)的資源佔用:

  • Continuation棧會佔據數百byte到數百kb內存空間
  • 虛擬線程實例會佔據200 - 240 byte

兩者對比一看,理論上得知單個平台線程佔用的內存空間至少是kb級別的,而通常單個虛擬線程實例佔用的內存空間是byte級別,兩者的內存佔用相差1個數量級。這裡可以使用NMT參數和jcmd命令進行驗證,見下面的代碼和結果。

public class PlatformThreadFootprint {

    private static final int COUNT = 100000;

    /**
     * platform thread footprint -Xms1g -Xmx1g -XX:NativeMemoryTracking=detail
     *
     * @param args args
     */
    public static void main(String[] args) throws Exception {
        for (int i = 0; i < COUNT; i++) {
            new Thread(() -> {
                try {
                    Thread.sleep(Long.MAX_VALUE);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

上面的程序運行後啟動10w平台線程,通過NMT參數和jcmd命令查看所有線程佔據的內存空間如下:

vt-source-code-3

可見總已提交內存大部分來自創建的平台線程,這些平台線程佔用了大概613 mb空間,它們的總線程棧空間佔用約為5862 mb,兩者加起來佔據總使用內存(7495 mb)的86 %以上。用類似的方式編寫運行虛擬線程的程序:

public class VirtualThreadFootprint {

    private static final int COUNT = 100000;

    /**
     * virtual thread footprint -Xms10m -Xmx100m -XX:NativeMemoryTracking=detail
     *
     * @param args args
     */
    public static void main(String[] args) throws Exception {
        for (int i = 0; i < COUNT; i++) {
            Thread.startVirtualThread(() -> {
                try {
                    Thread.sleep(Long.MAX_VALUE);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

上面的程序運行後啟動10w虛擬線程,同樣通過NMT參數和jcmd命令查看:

vt-source-code

這裡有意縮小虛擬線程程序的最小最大堆內存為-Xms10m -Xmx100m,程序依然正常運行,並且堆內存的實際佔用量和總內存的實際佔用量都不超過200 mb,由此可以證明虛擬線程確實在極大量創建的前提下不會佔據大量內存空間(這裡暫時沒有考慮到複雜調用情況下Continuation棧佔據內存空間大小,不過已經大幅度優於平台線程)。

局限性

當前的虛擬線程實現有如下局限性:

  • Continuation棧存在native方法或者外部函數(FFMAPI,見JEP-424)調用不能進行yield操作
  • 當持有監視器或者等待監視器的時候(一般是使用了synchronized關鍵字或者Object.wait())不能進行yield操作
  • Continuation棧存在native方法調用、外部函數調用或者當持有監視器或者等待監視器的時候,虛擬線程會Pin到平台線程,導致虛擬線程無法從平台線程卸載,雖然不會影響程序正確執行,但是會影響性能,也就是如果這些虛擬線程是可復用的,永遠無法切換到其運載線程,導致任務切換開銷永久性增大
  • 虛擬線程可以像平台線程一樣使用ThreadLocal,但是由於一般虛擬線程實例是會大量創建的,ThreadLocal本質是哈希表的一個鏈接,創建大量哈希表會帶來額外的內存開銷(這一點不算局限性,更接近於開發建議,建議使用虛擬線程的時候禁用ThreadLocal

對於前三點出現的情況,一些文檔中提到會導致虛擬線程無法從運載線程卸載,這個現象稱為Pinned Thread,通過系統參數jdk.tracePinnedThreads可以打印具體的Pinned Thread棧,從而定位到哪些虛擬線程被固定到哪些平台線程中。對於這個問題,目前可以通過編程規範去規避,也就是虛擬線程執行的任務盡量規避調用native方法或者外部函數,對於synchronized關鍵字可以使用JUC中的鎖API進行替換,例如ReentrantLock等等。

適用場景

基於繼承的特性,通過對java.lang.Thread(虛擬線程的超類)薄封裝,也就是基於ThreadAPI可以直接透明地實現虛擬線程的掛起和恢復等操作,對使用者屏蔽了虛擬線程複雜的調度實現。由於虛擬線程實例佔據的資源比較少,可以大量地創建而無須考慮池化,因此滿足類似下面的使用場景:

  • 大批量的處理時間較短的計算任務
  • 大量的IO阻塞等待處理
  • thread-per-request風格的應用程序,例如主流的Tomcat線程模型或者基於類似線程模型實現的SpringMVC框架等等

JUC親和性

還是基於繼承的特性,java.lang.VirtualThreadjava.lang.Thread子類型,因此使用到Thread類型的地方原則上可以透明使用VirtualThread,就是說通過下面的形式可以池化虛擬線程

public class VirtualThreadPool {
    
    public static void main(String[] args) throws Exception {
        ThreadFactory factory = Thread.ofVirtual().allowSetThreadLocals(false)
                .name("VirtualFactoryWorker-", 0)
                .inheritInheritableThreadLocals(false)
                .factory();
        // core = max = 10
        ThreadPoolExecutor fixedVirtualThreadPool
                = new ThreadPoolExecutor(10, 10, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory);
        fixedVirtualThreadPool.execute(() -> {
            Thread thread = Thread.currentThread();
            System.out.printf("線程名稱:%s,是否虛擬線程:%s\n", thread.getName(), thread.isVirtual());
        });
        fixedVirtualThreadPool.shutdown();
        fixedVirtualThreadPool.awaitTermination(5, TimeUnit.SECONDS);
    }
}

但是前面也提到過:由於虛擬線程本身是輕量級的,在執行計算任務的時候更建議每個任務新創建一個虛擬線程實例,因為池化操作本身是會引入額外開銷。另外,JUC下很多類庫都是基於AQS數據結構實現,而AQS中無論獨佔模式還是共享模式,在隊列中等待的節點以及搶佔虛擬頭節點的對象本質都是Thread實例,基於這一點來看,AQS也是無縫適配VirtualThread。見下面的例子:

public class VirtualThreadJuc {

    public static void main(String[] args) throws Exception {
        CountDownLatch latch = new CountDownLatch(1);
        Thread.startVirtualThread(() -> {
            try {
                System.out.println("before await");
                latch.await();
                System.out.println("after await");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Thread thread = Thread.currentThread();
            System.out.printf("線程名稱:%s,是否虛擬線程:%s\n", thread.getName(), thread.isVirtual());
        });
        Thread.sleep(1000);
        System.out.println("main count down");
        latch.countDown();
        Thread.sleep(Long.MAX_VALUE);
    }
}

// 運行結果
before await
main count down
after await
線程名稱:,是否虛擬線程:true

總的來說,VirtualThreadJUC既有類庫是親和的,大部分類庫可以在虛擬線程任務中使用,並且不建議池化虛擬線程而是從使用per task per virtual thread的編程模式。

小結

本文詳細介紹了平台線程與虛擬線程的區別、虛擬線程實現原理、虛擬線程的源碼實現以及關於虛擬線程的一些探討,希望能夠幫到讀者理解Java虛擬線程。在JDK19中,虛擬線程是預覽特性,希望這個特性能夠早點發佈GA版本,這樣才能填補Java協程這一塊短板,也能讓大量基礎API和框架進行一輪革新。

參考資料:

  • JEP-425//openjdk.org/jeps/425
  • JVMLS2018.pdf(這份PDF文檔詳細地介紹了Loom項目的目標和實現方式)://cr.openjdk.java.net/~rpressler/loom/loom/JVMLS2018.pdf

(本文完 e-a-20221005 c-3-d)