本文转载至:虚拟线程 - VirtualThread源码透视 - throwable - 博客园 (cnblogs.com)
一. 前提 JDK19
于2022-09-20
发布GA
版本,该版本提供了虚拟线程的预览功能。下载JDK19
之后翻看了一下有关虚拟线程的一些源码,跟早些时候的Loom
项目构建版本基本并没有很大出入,也跟第三方JDK
如鹅厂的Kona
虚拟线程实现方式基本一致,这里分析一下虚拟线程设计与源码实现。
因为引入了虚拟线程,原来JDK
存在java.lang.Thread
类,俗称线程,为了更好地区分虚拟线程和原有的线程类,引入了一个全新类java.lang.VirtualThread
(Thread
类的一个子类型),直译过来就是”虚拟线程”。
题外话:在Loom
项目早期规划里面,核心API
其实命名为Fiber
,直译过来就是”纤程”或者”协程”,后来成为了废案,在一些历史提交的Test
类或者文档中还能看到类似于下面的代码:
1 2 3 4 5 6 7 8 9 10 11 12 Fiber f = Fiber.execute({ out.println("Good morning" ); readLock.lock(); try { out.println("Good night" ); } finally { readLock.unlock(); } out.println("Good night" ); });
Thread
在此基础上做了不少兼容性工作。此外,还应用了建造者模式引入了线程建造器,提供了静态工厂方法Thread#ofPlatform()
和Thread#ofVirtual()
分别用于实例化Thread
(工厂)建造器和VirtualThread
(工厂)建造器,顾名思义,两种建造器分别用于创建Thread
或者VirtualThread
,例如:
1 2 3 4 5 6 7 8 9 10 11 12 Thread platformThread = Thread.ofPlatform().daemon().name("worker" ).unstarted(runnable);ThreadFactory platformThreadFactory = Thread.ofPlatform().daemon().name("worker-" , 0 ).factory();Thread virtualThread = Thread.ofVirtual().name("virtual-worker" ).unstarted(runnable);ThreadFactory virtualThreadFactory = Thread.ofVirtual().name("virtual-worker-" , 0 ).factory();
更新的JDK
文档中也把原来的Thread
称为Platform Thread
,可以更明晰地与Virtual Thread
区分开来。这里Platform Thread
直译为”平台线程”,其实就是”虚拟线程”出现之前的老生常谈的”线程”。
后文会把Platform Thread称为平台线程,Virtual Thread称为虚拟线程,或者直接用其英文名称
那么平台线程与虚拟线程的联系和区别是什么?JDK
中的每个java.lang.Thread
实例也就是每个平台线程实例都在底层操作系统线程上运行Java
代码,并且平台线程在运行代码的整个生命周期内捕获系统线程 。可以得出一个结论,平台线程与底层系统线程是一一对应的,平台线程实例本质是由系统内核的线程调度程序进行调度,并且平台线程的总数量受限于系统线程的总数量。
总的来说,平台线程有下面的一些特点或者说限制:
资源有限导致系统线程总量有限,进而导致与系统线程一一对应的平台线程有限
平台线程的调度依赖于系统的线程调度程序,当平台线程创建过多,会消耗大量资源用于处理线程上下文切换
每个平台线程都会开辟一块私有的栈空间,大量平台线程会占据大量内存
这些限制导致开发者不能极大量地创建平台线程,为了满足性能需要,需要引入池化技术、添加任务队列构建消费者-生产者模式等方案去让平台线程适配多变的现实场景。显然,开发者们迫切需要一种轻量级线程实现,刚好可以弥补上面提到的平台线程的限制,这种轻量级线程可以满足:
可以大量创建,例如十万级别、百万级别,而不会占据大量内存
由JVM
进行调度和状态切换,并且与系统线程”松绑”
用法与原来平台线程差不多,或者说尽量兼容平台线程现存的API
Loom
项目中开发的虚拟线程就是为了解决这个问题,看起来它的运行示意图如下:
当然,平台线程不是简单地与虚拟线程进行1:N
的绑定,后面的章节会深入分析虚拟线程的运行原理。
三. 虚拟线程实现原理 虚拟线程是一种轻量级(用户模式)线程,这种线程是由Java
虚拟机调度,而不是操作系统。虚拟线程占用空间小,任务切换开销几乎可以忽略不计,因此可以极大量地创建和使用。总体来看,虚拟线程实现如下:
1 virtual thread = continuation + scheduler
虚拟线程会把任务(一般是java.lang.Runnable
)包装到一个Continuation
实例中:
当任务需要阻塞挂起的时候,会调用Continuation
的yield
操作进行阻塞
当任务需要解除阻塞继续执行的时候,Continuation
会被继续执行
Scheduler
也就是执行器,会把任务提交到一个载体线程池中执行:
执行器是java.util.concurrent.Executor
的子类
虚拟线程框架提供了一个默认的ForkJoinPool
用于执行虚拟线程任务
下文会把carrier thread称为”载体线程”,指的是负责执行虚拟线程中任务的平台线程,或者说运行虚拟线程的平台线程称为它的载体线程
操作系统调度系统线程,而Java
平台线程与系统线程一一映射,所以平台线程被操作系统调度,但是虚拟线程是由JVM
调度。JVM
把虚拟线程分配给平台线程的操作称为mount
(挂载),反过来取消分配平台线程的操作称为unmount
(卸载):
mount
操作:虚拟线程挂载到平台线程,虚拟线程中包装的Continuation
栈数据帧或者引用栈数据会被拷贝到平台线程的线程栈,这是一个从堆复制到栈的过程
unmount
操作:虚拟线程从平台线程卸载,大多数虚拟线程中包装的Continuation
栈数据帧会留在堆内存中
这个mount -> run -> unmount
过程用伪代码表示如下:
1 2 3 4 5 6 mount(); try { Continuation.run(); } finally { unmount(); }
从Java
代码的角度来看,虚拟线程和它的载体线程暂时共享一个OS
线程实例这个事实是不可见,因为虚拟线程的堆栈跟踪和线程本地变量与平台线程是完全隔离的。JDK
中专门是用了一个FIFO
模式的ForkJoinPool
作为虚拟线程的调度程序,从这个调度程序看虚拟线程任务的执行流程大致如下:
一个虚拟线程被分配平台线程,该平台线程作为运载线程执行虚拟线程中的任务
虚拟线程运行其Continuation
,从而执行基于Runnable
包装的用户任务
虚拟线程任务执行完成,标记Continuation
终结,标记虚拟线程为终结状态,清空一些上下文变量,运载线程”返还”到调度器(线程池)中作为平台线程等待处理下一个任务
上面是描述一般的虚拟线程任务执行情况,在执行任务时候首次调用Continuation#run()
获取锁(ReentrantLock
)的时候会触发Continuation
的yield
操作让出控制权,等待虚拟线程重新分配运载线程并且执行,见下面的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class VirtualThreadLock { public static void main (String[] args) throws Exception { ReentrantLock lock = new ReentrantLock (); Thread.startVirtualThread(() -> { lock.lock(); }); Thread.sleep(1000 ); Thread.startVirtualThread(() -> { System.out.println("first" ); lock.lock(); try { System.out.println("second" ); } finally { lock.unlock(); } System.out.println("third" ); }); Thread.sleep(Long.MAX_VALUE); } }
虚拟线程中任务执行时候首次调用Continuation#run()
执行了部分任务代码,然后尝试获取锁,会导致Continuation
的yield
操作让出控制权(任务切换),也就是unmount
,运载线程栈数据会移动到Continuation
栈的数据帧中,保存在堆内存,虚拟线程任务完成(但是虚拟线程没有终结,同时其Continuation
也没有终结和释放),运载线程被释放到执行器中等待新的任务;如果Continuation
的yield
操作失败,则会对运载线程进行park
调用,阻塞在运载线程上
当锁持有者释放锁之后,会唤醒虚拟线程获取锁(成功后),虚拟线程会重新进行mount
,让虚拟线程任务再次执行,有可能是分配到另一个运载线程中执行,Continuation
栈会的数据帧会被恢复到运载线程栈中,然后再次调用Continuation#run()
恢复任务执行:
最终虚拟线程任务执行完成,标记Continuation
终结,标记虚拟线程为终结状态,清空一些上下文变量,运载线程”返还”到调度器(线程池)中作为平台线程等待处理下一个任务
Continuation
组件十分重要,它既是用户真实任务的包装器,也是任务切换虚拟线程与平台线程之间数据转移的一个句柄,它提供的yield
操作可以实现任务上下文的中断和恢复。由于Continuation
被封闭在java.base/jdk.internal.vm
下,可以通过增加编译参数--add-exports java.base/jdk.internal.vm=ALL-UNNAMED
暴露对应的功能,从而编写实验性案例,IDEA
中可以按下图进行编译参数添加:
然后编写和运行下面的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import jdk.internal.vm.Continuation;import jdk.internal.vm.ContinuationScope;public class ContinuationDemo { public static void main (String[] args) { ContinuationScope scope = new ContinuationScope ("scope" ); Continuation continuation = new Continuation (scope, () -> { System.out.println("Running before yield" ); Continuation.yield (scope); System.out.println("Running after yield" ); }); System.out.println("First run" ); continuation.run(); System.out.println("Second run" ); continuation.run(); System.out.println("Done" ); } }
运行结果:
1 2 3 4 5 6 First run Running before yield Second run Running after yield Done
这里可以看出Continuation
的奇妙之处,Continuation
实例进行yield
调用后,再次调用其run
方法就可以从yield
的调用之处往下执行,从而实现了程序的中断和恢复。
四. 源码分析 主要包括:
Continuation
VirtualThread
线程建造器
4.1 Continuation Continuation
直译为”连续”,一般来说表示一种语言构造,使语言可以在任意点保存执行状态并且在之后的某个点返回 。在JDK
中对应类jdk.internal.vm.Continuation
,这个类只有一句类注释A one-shot delimited continuation
,直译为一个只能执行一次的回调函数 。由于Continuation
的成员和方法缺少详细的注释,并且大部分功能由JVM
实现,这里只能阅读其一些骨干源码和上一小节编写的Continuation
相关例子去了解其实现(笔者C
语言比较薄弱,有兴趣的可以翻阅JVM
的源码)。先看成员变量和构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 private static final boolean PRESERVE_EXTENT_LOCAL_CACHE;private final Runnable target;private final ContinuationScope scope;private Continuation parent;private Continuation child;private StackChunk tail;private boolean done;private volatile boolean mounted = false ;private Object yieldInfo;private boolean preempted;private Object[] extentLocalCache;public Continuation (ContinuationScope scope, Runnable target) { this .scope = scope; this .target = target; }
Continuation
是一个双向链表设计,它的唯一一组构造参数是ContinuationScope
和Runnable
:
这里不深入研究内部StackChunk
、Pinned
等实现,直接看run
、enter
系列方法和yield
方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 public final void run () { while (true ) { mount(); JLA.setExtentLocalCache(extentLocalCache); if (done) throw new IllegalStateException ("Continuation terminated" ); Thread t = currentCarrierThread(); if (parent != null ) { if (parent != JLA.getContinuation(t)) throw new IllegalStateException (); } else this .parent = JLA.getContinuation(t); JLA.setContinuation(t, this ); try { boolean isVirtualThread = (scope == JLA.virtualThreadContinuationScope()); if (!isStarted()) { enterSpecial(this , false , isVirtualThread); } else { assert !isEmpty(); enterSpecial(this , true , isVirtualThread); } } finally { fence(); try { assert isEmpty () == done : "empty: " + isEmpty() + " done: " + done + " cont: " + Integer.toHexString(System.identityHashCode(this )); JLA.setContinuation(currentCarrierThread(), this .parent); if (parent != null ) parent.child = null ; postYieldCleanup(); unmount(); if (PRESERVE_EXTENT_LOCAL_CACHE) { extentLocalCache = JLA.extentLocalCache(); } else { extentLocalCache = null ; } JLA.setExtentLocalCache(null ); } catch (Throwable e) { e.printStackTrace(); System.exit(1 ); } } assert yieldInfo == null || yieldInfo instanceof ContinuationScope; if (yieldInfo == null || yieldInfo == scope) { this .parent = null ; this .yieldInfo = null ; return ; } else { parent.child = this ; parent.yield0((ContinuationScope)yieldInfo, this ); parent.child = null ; } } } private native static void enterSpecial (Continuation c, boolean isContinue, boolean isVirtualThread) ;@DontInline @IntrinsicCandidate private static void enter (Continuation c, boolean isContinue) { try { c.enter0(); } finally { c.finish(); } } private void enter0 () { target.run(); } private void finish () { done = true ; assert isEmpty () ; } public static boolean yield (ContinuationScope scope) { Continuation cont = JLA.getContinuation(currentCarrierThread()); Continuation c; for (c = cont; c != null && c.scope != scope; c = c.parent) ; if (c == null ) throw new IllegalStateException ("Not in scope " + scope); return cont.yield0(scope, null ); } private boolean yield0 (ContinuationScope scope, Continuation child) { preempted = false ; if (scope != this .scope) this .yieldInfo = scope; int res = doYield(); U.storeFence(); assert scope != this .scope || yieldInfo == null : "scope: " + scope + " this.scope: " + this .scope + " yieldInfo: " + yieldInfo + " res: " + res; assert yieldInfo == null || scope == this .scope || yieldInfo instanceof Integer : "scope: " + scope + " this.scope: " + this .scope + " yieldInfo: " + yieldInfo + " res: " + res; if (child != null ) { if (res != 0 ) { child.yieldInfo = res; } else if (yieldInfo != null ) { assert yieldInfo instanceof Integer; child.yieldInfo = yieldInfo; } else { child.yieldInfo = res; } this .yieldInfo = null ; } else { if (res == 0 && yieldInfo != null ) { res = (Integer)yieldInfo; } this .yieldInfo = null ; if (res == 0 ) onContinue(); else onPinned0(res); } assert yieldInfo == null ; return res == 0 ; } @IntrinsicCandidate private static int doYield () { throw new Error ("Intrinsic not installed" ); }
说实话,Continuation
源码的可读性比想象中低,连代码注释也留下了”丑陋的”这句吐槽。通过上面源码分析和上一节Continuation
的一个例子,可以得知Continuation#yield()
可以让程序代码中断,然后再次调用Continuation#run()
可以从上一个中断位置继续执行,JVM
在这个过程中为使用者屏蔽了Continuation
和运行此Continuation
的平台线程之间的交互细节,让使用者可以专注实际的任务开发即可。
4.2 VirtualThread 前面花了不少篇幅介绍Continuation
,它是一个全新的API
。已有的JUC
类库已经十分完善,如果可以把Continuation
融入到已有的JUC
体系,那么就可以通过线程池技术去管理运载线程,原有的大多数并发相关API
也能直接在协程体系中使用。从这个背景来看,创造一个Thread
类的全新子类用于融合JUC
和Continuation
是十分合适的,这样通过很小的改造成本就能通过Java
继承特性把这个全新子类适配JUC
体系,也能扩展一些API
让它适配协程新引入的特性,这个全新的子类就是java.lang.VirtualThread
:
VirtualThread
类的继承体系如下:
1 2 3 4 5 6 7 8 9 10 11 12 package java.lang;final class VirtualThread extends BaseVirtualThread { } package java.lang;sealed abstract class BaseVirtualThread extends Thread permits VirtualThread, ThreadBuilders.BoundVirtualThread { }
VirtualThread
是BaseVirtualThread
的子类,而BaseVirtualThread
是一个”密封类”,它是Thread
的子类,只对VirtualThread
和ThreadBuilders.BoundVirtualThread
开放,并且VirtualThread
是包私有访问权限的 同时用final
关键字修饰,无法被继承。接着看VirtualThread
的成员变量和构造函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 private static final Unsafe U = Unsafe.getUnsafe();private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope ("VirtualThreads" );private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();private static final int TRACE_PINNING_MODE = tracePinningMode();private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state" );private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit" );private static final long CARRIER_THREAD = U.objectFieldOffset(VirtualThread.class, "carrierThread" );private static final long TERMINATION = U.objectFieldOffset(VirtualThread.class, "termination" );private final Executor scheduler;private final Continuation cont;private final Runnable runContinuation;private volatile int state;private static final int NEW = 0 ;private static final int STARTED = 1 ;private static final int RUNNABLE = 2 ; private static final int RUNNING = 3 ; private static final int PARKING = 4 ;private static final int PARKED = 5 ; private static final int PINNED = 6 ; private static final int YIELDING = 7 ; private static final int TERMINATED = 99 ; private static final int SUSPENDED = 1 << 8 ;private static final int RUNNABLE_SUSPENDED = (RUNNABLE | SUSPENDED);private static final int PARKED_SUSPENDED = (PARKED | SUSPENDED);private volatile boolean parkPermit;private volatile Thread carrierThread;private volatile CountDownLatch termination;VirtualThread(Executor scheduler, String name, int characteristics, Runnable task) { super (name, characteristics, false ); Objects.requireNonNull(task); if (scheduler == null ) { Thread parent = Thread.currentThread(); if (parent instanceof VirtualThread vparent) { scheduler = vparent.scheduler; } else { scheduler = DEFAULT_SCHEDULER; } } this .scheduler = scheduler; this .cont = new VThreadContinuation (this , task); this .runContinuation = this ::runContinuation; } private static class VThreadContinuation extends Continuation { VThreadContinuation(VirtualThread vthread, Runnable task) { super (VTHREAD_SCOPE, () -> vthread.run(task)); } @Override protected void onPinned (Continuation.Pinned reason) { if (TRACE_PINNING_MODE > 0 ) { boolean printAll = (TRACE_PINNING_MODE == 1 ); PinnedThreadPrinter.printStackTrace(System.out, printAll); } } } private void runContinuation () { if (Thread.currentThread().isVirtual()) { throw new WrongThreadException (); } boolean firstRun; int initialState = state(); if (initialState == STARTED && compareAndSetState(STARTED, RUNNING)) { firstRun = true ; } else if (initialState == RUNNABLE && compareAndSetState(RUNNABLE, RUNNING)) { setParkPermit(false ); firstRun = false ; } else { return ; } if (notifyJvmtiEvents) notifyJvmtiMountBegin(firstRun); try { cont.run(); } finally { if (cont.isDone()) { afterTerminate( true ); } else { afterYield(); } } } private void afterTerminate (boolean executed) { assert (state() == TERMINATED) && (carrierThread == null ); if (executed) { if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(true ); } CountDownLatch termination = this .termination; if (termination != null ) { assert termination.getCount() == 1 ; termination.countDown(); } if (executed) { threadContainer().onExit(this ); clearReferences(); } } private void afterYield () { int s = state(); assert (s == PARKING || s == YIELDING) && (carrierThread == null ); if (s == PARKING) { setState(PARKED); if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false ); if (parkPermit && compareAndSetState(PARKED, RUNNABLE)) { lazySubmitRunContinuation(); } } else if (s == YIELDING) { setState(RUNNABLE); if (notifyJvmtiEvents) notifyJvmtiUnmountEnd(false ); lazySubmitRunContinuation(); } }
这里唯一的构造函数是比较复杂的,抛开一些钩子接口,最终想达到的效果就是:
1 Runnable.run()[runContinuation by carrier thread from executor] --> Continuation.run() --> Continuation.enter() --> VirtualThread.run() --> Runnable.run()[user task]
用户任务实际被包裹了很多层,在最里面一层才会回调。VirtualThread
中提供了两个静态全局的线程池实例,一个用于调度,一个用于唤醒,这里看看两个线程池是如何构造的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();private static final ScheduledExecutorService UNPARKER = createDelayedTaskScheduler();private static ForkJoinPool createDefaultScheduler () { ForkJoinWorkerThreadFactory factory = pool -> { PrivilegedAction<ForkJoinWorkerThread> pa = () -> new CarrierThread (pool); return AccessController.doPrivileged(pa); }; PrivilegedAction<ForkJoinPool> pa = () -> { int parallelism, maxPoolSize, minRunnable; String parallelismValue = System.getProperty("jdk.virtualThreadScheduler.parallelism" ); String maxPoolSizeValue = System.getProperty("jdk.virtualThreadScheduler.maxPoolSize" ); String minRunnableValue = System.getProperty("jdk.virtualThreadScheduler.minRunnable" ); if (parallelismValue != null ) { parallelism = Integer.parseInt(parallelismValue); } else { parallelism = Runtime.getRuntime().availableProcessors(); } if (maxPoolSizeValue != null ) { maxPoolSize = Integer.parseInt(maxPoolSizeValue); parallelism = Integer.min(parallelism, maxPoolSize); } else { maxPoolSize = Integer.max(parallelism, 256 ); } if (minRunnableValue != null ) { minRunnable = Integer.parseInt(minRunnableValue); } else { minRunnable = Integer.max(parallelism / 2 , 1 ); } Thread.UncaughtExceptionHandler handler = (t, e) -> { }; boolean asyncMode = true ; return new ForkJoinPool (parallelism, factory, handler, asyncMode, 0 , maxPoolSize, minRunnable, pool -> true , 30 , SECONDS); }; return AccessController.doPrivileged(pa); } private static ScheduledExecutorService createDelayedTaskScheduler () { String propValue = GetPropertyAction.privilegedGetProperty("jdk.unparker.maxPoolSize" ); int poolSize; if (propValue != null ) { poolSize = Integer.parseInt(propValue); } else { poolSize = 1 ; } ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(poolSize, task -> { return InnocuousThread.newThread("VirtualThread-unparker" , task); }); stpe.setRemoveOnCancelPolicy(true ); return stpe; }
对于默认调度器(DEFAULT_SCHEDULER
)的创建,它是一个ForkJoinPool
实例,构造参数的选取如下:
parallelism
参数由系统变量jdk.virtualThreadScheduler.parallelism
决定,默认值为Runtime.getRuntime().availableProcessors()
,如果配置了系统参数jdk.virtualThreadScheduler.maxPoolSize
则取min(parallelism,maxPoolSize)
maxPoolSize
参数由系统变量jdk.virtualThreadScheduler.maxPoolSize
决定,默认值为min(parallelism, maxPoolSize)
minRunnable
参数由系统变量jdk.virtualThreadScheduler.minRunnable
决定,默认值为max(parallelism / 2, 1)
asyncMode
参数固定值true
,也就是选用FIFO
模式
keepAliveTime
参数为固定值30
秒
saturate
参数在JDK17
引入,是一个Predicate
函数,在此固定返回true
,用于忽略minRunnable
值允许线程池饱和
线程工厂用于创建CarrierThread
实例,CarrierThread
是ForkJoinWorkerThread
的子类
在Intel 4C8T
开发机器环境中,该ForkJoinPool
实例创建时候的几个参数分别为:parallelism = 8, maxPoolSize = 256, minRunnable = 4
。
对于调度线程池(UNPARKER
)的创建,它是一个ScheduledThreadPoolExecutor
实例,构造参数的选取如下:
corePoolSize
参数由系统变量jdk.unparker.maxPoolSize
决定,并且确保最小值为1
线程工厂用于创建InnocuousThread
实例,线程名称为VirtualThread-unparker
接着看虚拟线程的启动方法start()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 @Override public void start () { start(ThreadContainers.root()); } @Override void start (ThreadContainer container) { if (!compareAndSetState(NEW, STARTED)) { throw new IllegalThreadStateException ("Already started" ); } setThreadContainer(container); boolean started = false ; container.onStart(this ); try { inheritExtentLocalBindings(container); submitRunContinuation(); started = true ; } finally { if (!started) { setState(TERMINATED); container.onExit(this ); afterTerminate( false ); } } } private void submitRunContinuation () { submitRunContinuation(false ); } private void submitRunContinuation (boolean lazySubmit) { try { if (lazySubmit && scheduler instanceof ForkJoinPool pool) { pool.lazySubmit(ForkJoinTask.adapt(runContinuation)); } else { scheduler.execute(runContinuation); } } catch (RejectedExecutionException ree) { var event = new VirtualThreadSubmitFailedEvent (); if (event.isEnabled()) { event.javaThreadId = threadId(); event.exceptionMessage = ree.getMessage(); event.commit(); } throw ree; } }
ForkJoinPool#lazySubmit()
是JDK19
新增的一个API
,它的方法注释如下:
提交给定的任务,但不保证它最终会在没有可用活动线程的情况下执行。在某些上下文中,这种方法可以通过依赖于特定于上下文的知识来减少竞争和开销,即现有线程(如果在此池中操作,则可能包括调用线程)最终将可用来执行任务
使用此方法提交的目的就是希望可以用当前调用线程去执行任务,对于首次提交Continuation
任务可能作用不明显,但是对于Continuation.yield()
调用后的再次提交意义比较重大,因为这样就可以把运行的Continuation.run()
方法链分配到同一个运载线程实例 ,在开发者的角度就是虚拟线程任务执行中断后恢复执行,执行任务的运载线程没有改变。
源码中还可以发现,run()
方法覆盖了Thread#run()
替换为空实现,因为VirtualThread
最终是触发Continuation#run()
,这一点已经在start()
方法进行提交和调度。最后分析虚拟线程的阻塞(不带超时,也就是timeout = 0
)、限时阻塞(timeout > 0
)、join
的实现。先看相对简单的joinNanos()
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 boolean joinNanos (long nanos) throws InterruptedException { if (state() == TERMINATED) return true ; CountDownLatch termination = getTermination(); if (state() == TERMINATED) return true ; if (nanos == 0 ) { termination.await(); } else { boolean terminated = termination.await(nanos, NANOSECONDS); if (!terminated) { return false ; } } assert state () == TERMINATED; return true ; } private CountDownLatch getTermination () { CountDownLatch termination = this .termination; if (termination == null ) { termination = new CountDownLatch (1 ); if (!U.compareAndSetReference(this , TERMINATION, null , termination)) { termination = this .termination; } } return termination; }
接着看虚拟线程阻塞和限时阻塞的现实:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 void sleepNanos (long nanos) throws InterruptedException { assert Thread.currentThread() == this ; if (nanos >= 0 ) { if (ThreadSleepEvent.isTurnedOn()) { ThreadSleepEvent event = new ThreadSleepEvent (); try { event.time = nanos; event.begin(); doSleepNanos(nanos); } finally { event.commit(); } } else { doSleepNanos(nanos); } } } private void doSleepNanos (long nanos) throws InterruptedException { assert nanos >= 0 ; if (getAndClearInterrupt()) throw new InterruptedException (); if (nanos == 0 ) { tryYield(); } else { try { long remainingNanos = nanos; long startNanos = System.nanoTime(); while (remainingNanos > 0 ) { parkNanos(remainingNanos); if (getAndClearInterrupt()) { throw new InterruptedException (); } remainingNanos = nanos - (System.nanoTime() - startNanos); } } finally { setParkPermit(true ); } } } @Override void parkNanos (long nanos) { assert Thread.currentThread() == this ; if (getAndSetParkPermit(false ) || interrupted) return ; if (nanos > 0 ) { long startTime = System.nanoTime(); boolean yielded; Future<?> unparker = scheduleUnpark(nanos); setState(PARKING); try { yielded = yieldContinuation(); } finally { assert (Thread.currentThread() == this ) && (state() == RUNNING || state() == PARKING); cancel(unparker); } if (!yielded) { long deadline = startTime + nanos; if (deadline < 0L ) deadline = Long.MAX_VALUE; parkOnCarrierThread(true , deadline - System.nanoTime()); } } } private void parkOnCarrierThread (boolean timed, long nanos) { assert state () == PARKING; var pinnedEvent = new VirtualThreadPinnedEvent (); pinnedEvent.begin(); setState(PINNED); try { if (!parkPermit) { if (!timed) { U.park(false , 0 ); } else if (nanos > 0 ) { U.park(false , nanos); } } } finally { setState(RUNNING); } setParkPermit(false ); pinnedEvent.commit(); } @ChangesCurrentThread private Future<?> scheduleUnpark(long nanos) { Thread carrier = this .carrierThread; carrier.setCurrentThread(carrier); try { return UNPARKER.schedule(() -> unpark(), nanos, NANOSECONDS); } finally { carrier.setCurrentThread(this ); } } @ChangesCurrentThread private void cancel (Future<?> future) { if (!future.isDone()) { Thread carrier = this .carrierThread; carrier.setCurrentThread(carrier); try { future.cancel(false ); } finally { carrier.setCurrentThread(this ); } } } @Override @ChangesCurrentThread void unpark () { Thread currentThread = Thread.currentThread(); if (!getAndSetParkPermit(true ) && currentThread != this ) { int s = state(); if (s == PARKED && compareAndSetState(PARKED, RUNNABLE)) { if (currentThread instanceof VirtualThread vthread) { Thread carrier = vthread.carrierThread; carrier.setCurrentThread(carrier); try { submitRunContinuation(); } finally { carrier.setCurrentThread(vthread); } } else { submitRunContinuation(); } } else if (s == PINNED) { synchronized (carrierThreadAccessLock()) { Thread carrier = carrierThread; if (carrier != null && state() == PINNED) { U.unpark(carrier); } } } } } void tryYield () { assert Thread.currentThread() == this ; setState(YIELDING); try { yieldContinuation(); } finally { assert Thread.currentThread() == this ; if (state() != RUNNING) { assert state () == YIELDING; setState(RUNNING); } } } private boolean yieldContinuation () { boolean notifyJvmti = notifyJvmtiEvents; if (notifyJvmti) notifyJvmtiUnmountBegin(false ); unmount(); try { return Continuation.yield (VTHREAD_SCOPE); } finally { mount(); if (notifyJvmti) notifyJvmtiMountEnd(false ); } }
总的来说就是:
阻塞:通过Continuation.yield()
调用实现阻塞,主要是提供给Thread.sleep()
调用
限时阻塞:Continuation.yield()
调用之前计算唤醒时间并且向调度线程池(UNPARKER
)提交一个延时执行 的unpark
任务通过”懒提交”方式重新运行Continuation.run()
调用链解除阻塞,主要是提供给Thread.sleep(long nanos)
调用
join(Nanos)
:通过CountDownLatch.await()
调用实现阻塞,在虚拟线程终结钩子方法afterTerminate()
中调用CountDownLatch.countDown()
解除阻塞,join(Nanos)()
方法主要是提供给Thread.join()
调用
特殊情况:如果Continuation.yield()
调用失败,则会通过Unsafe
提供的park API
阻塞在运载线程上,在unpark
任务中通过Unsafe
提供的unpark API
解除阻塞
分析完虚拟线程实现的核心代码,这里总结一下虚拟线程的状态切换,由于支持的状态比较多,这里通过一张状态图进行展示:
还有其他像获取虚拟线程栈、JVM
状态通知、获取虚拟线程状态、状态切换的CAS
操作等方法限于篇幅这里就不展开分析。
4.3 线程建造器 线程建造器和线程工厂建造器用于快速创建平台线程实例、平台线程工厂实例、虚拟线程实例或者虚拟线程工厂实例。熟悉Builder
模式的开发者看这个新引入的功能源码应该比较轻松:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 @PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS) public sealed interface Builder permits Builder.OfPlatform, Builder.OfVirtual, ThreadBuilders.BaseThreadBuilder { Builder name (String name) ; Builder name (String prefix, long start) ; Builder allowSetThreadLocals (boolean allow) ; Builder inheritInheritableThreadLocals (boolean inherit) ; Builder uncaughtExceptionHandler (UncaughtExceptionHandler ueh) ; Thread unstarted (Runnable task) ; Thread start (Runnable task) ; ThreadFactory factory () ; @PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS) sealed interface OfPlatform extends Builder permits ThreadBuilders.PlatformThreadBuilder { @Override OfPlatform name (String name) ; @Override OfPlatform name (String prefix, long start) ; @Override OfPlatform allowSetThreadLocals (boolean allow) ; @Override OfPlatform inheritInheritableThreadLocals (boolean inherit) ; @Override OfPlatform uncaughtExceptionHandler (UncaughtExceptionHandler ueh) ; OfPlatform group (ThreadGroup group) ; OfPlatform daemon (boolean on) ; default OfPlatform daemon () { return daemon(true ); } OfPlatform priority (int priority) ; OfPlatform stackSize (long stackSize) ; } @PreviewFeature(feature = PreviewFeature.Feature.VIRTUAL_THREADS) sealed interface OfVirtual extends Builder permits ThreadBuilders.VirtualThreadBuilder { @Override OfVirtual name (String name) ; @Override OfVirtual name (String prefix, long start) ; @Override OfVirtual allowSetThreadLocals (boolean allow) ; @Override OfVirtual inheritInheritableThreadLocals (boolean inherit) ; @Override OfVirtual uncaughtExceptionHandler (UncaughtExceptionHandler ueh) ; } }
上面的Builder
接口都在java.lang.ThreadBuilders
中进行实现,因为整体实现比较简单,这里只看全新引入的VirtualThreadFactory
和VirtualThreadBuilder
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 private static class VirtualThreadFactory extends BaseThreadFactory { private final Executor scheduler; VirtualThreadFactory(Executor scheduler, String name, long start, int characteristics, UncaughtExceptionHandler uhe) { super (name, start, characteristics, uhe); this .scheduler = scheduler; } @Override public Thread newThread (Runnable task) { Objects.requireNonNull(task); String name = nextThreadName(); Thread thread = newVirtualThread(scheduler, name, characteristics(), task); UncaughtExceptionHandler uhe = uncaughtExceptionHandler(); if (uhe != null ) thread.uncaughtExceptionHandler(uhe); return thread; } } static Thread newVirtualThread (Executor scheduler, String name, int characteristics, Runnable task) { if (ContinuationSupport.isSupported()) { return new VirtualThread (scheduler, name, characteristics, task); } else { if (scheduler != null ) throw new UnsupportedOperationException (); return new BoundVirtualThread (name, characteristics, task); } } static final class VirtualThreadBuilder extends BaseThreadBuilder <OfVirtual> implements OfVirtual { private Executor scheduler; VirtualThreadBuilder() { } VirtualThreadBuilder(Executor scheduler) { if (!ContinuationSupport.isSupported()) throw new UnsupportedOperationException (); this .scheduler = Objects.requireNonNull(scheduler); } @Override public Thread unstarted (Runnable task) { Objects.requireNonNull(task); var thread = newVirtualThread(scheduler, nextThreadName(), characteristics(), task); UncaughtExceptionHandler uhe = uncaughtExceptionHandler(); if (uhe != null ) thread.uncaughtExceptionHandler(uhe); return thread; } @Override public Thread start (Runnable task) { Thread thread = unstarted(task); thread.start(); return thread; } @Override public ThreadFactory factory () { return new VirtualThreadFactory (scheduler, name(), counter(), characteristics(), uncaughtExceptionHandler()); } }
值得注意的是:虚拟线程实现上来看都是”守护线程”,也就是说虚拟线程不需要设置daemon
参数。平台线程或者虚拟线程的建造器或者工厂实现都是包访问权限的内部类,其父类使用了permits
关键字指定继承范围,目前是只能通过链式设置值的方式初始化,无法修改其中的成员或者方法。
五. 其他探讨 其他探讨主要包括:
自定义执行器
内存占用评估
局限性
适用场景
JUC
亲和性
5.1 自定义执行器 然虚拟线程建造器屏蔽了执行器Executor
实例的公共访问权限,在目前预留功能版本下只能所有虚拟线程的任务最终都是由全局的ForkJoinPool
执行,可以通过VarHandle
对其进行强制值设置,这样就能修改虚拟线程底层的载体线程为我们自定义线程池中的平台线程,例如这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 public class VirtualThreadCustomExecutor { public static void main (String[] args) throws Exception { ExecutorService carrier = Executors.newSingleThreadExecutor(runnable -> { Thread thread = new Thread (runnable); thread.setDaemon(true ); thread.setName("CustomVirtualCarrier" ); return thread; }); Thread.Builder.OfVirtual ofVirtual = Thread.ofVirtual(); Class<?> klass = Class.forName("java.lang.ThreadBuilders$VirtualThreadBuilder" ); VarHandle varHandle = MethodHandles.privateLookupIn(klass, MethodHandles.lookup()).findVarHandle(klass, "scheduler" , Executor.class); varHandle.set(ofVirtual, carrier); ThreadFactory factory = ofVirtual.name("VirtualWorker-" , 0 ).allowSetThreadLocals(false ).factory(); ExecutorService virtualWorkerPool = Executors.newThreadPerTaskExecutor(factory); virtualWorkerPool.execute(() -> { Thread thread = Thread.currentThread(); System.out.printf("first task ==> 线程名称:%s,载体线程名称:%s,是否虚拟线程:%s\n" , thread.getName(), getCurrentCarrierThreadName(thread), thread.isVirtual()); }); virtualWorkerPool.execute(() -> { Thread thread = Thread.currentThread(); System.out.printf("second task ==> 线程名称:%s,载体线程名称:%s,是否虚拟线程:%s\n" , thread.getName(), getCurrentCarrierThreadName(thread), thread.isVirtual()); }); Thread.sleep(Long.MAX_VALUE); } private static String getCurrentCarrierThreadName (Thread currentThread) { if (currentThread.isVirtual()) { try { MethodHandle methodHandle = MethodHandles.privateLookupIn(Thread.class, MethodHandles.lookup()) .findStatic(Thread.class, "currentCarrierThread" , MethodType.methodType(Thread.class)); Thread carrierThread = (Thread) methodHandle.invoke(); return carrierThread.getName(); } catch (Throwable e) { e.printStackTrace(); } } return "UNKNOWN" ; } } first task ==> 线程名称:VirtualWorker-0 ,载体线程名称:CustomVirtualCarrier,是否虚拟线程:true second task ==> 线程名称:VirtualWorker-1 ,载体线程名称:CustomVirtualCarrier,是否虚拟线程:true
可以看到最终效果,虚拟线程中的任务最终在自定义线程池中的唯一平台线程中运行。这里只是做一个实验性例子 ,使用反射或者MethodHandle
对未稳定的API
进行操作以后有很大概率会出现兼容性问题,不建议在生产环境这样操作,待虚拟线程完成预览正式发布后应该会提供对应的API
让开发者设置自定义执行器。
5.2 资源占用评估 平台线程(单个实例)的资源占用:
通常是预留1 mb
线程栈空间,额外需要16 kb
操作系统核心数据源结构
对于已经启动的平台线程实例,会占据2000+ byte
数据,包括VM
中平台线程的元数据等
虚拟线程(单个实例)的资源占用:
Continuation
栈会占据数百byte
到数百kb
内存空间
虚拟线程实例会占据200 - 240 byte
两者对比一看,理论上得知单个平台线程占用的内存空间至少是kb
级别的,而通常单个虚拟线程实例占用的内存空间是byte
级别,两者的内存占用相差1
个数量级。这里可以使用NMT
参数和jcmd
命令 进行验证,见下面的代码和结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class PlatformThreadFootprint { private static final int COUNT = 100000 ; public static void main (String[] args) throws Exception { for (int i = 0 ; i < COUNT; i++) { new Thread (() -> { try { Thread.sleep(Long.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } }, String.valueOf(i)).start(); } Thread.sleep(Long.MAX_VALUE); } }
上面的程序运行后启动10w
平台线程,通过NMT
参数和jcmd
命令查看所有线程占据的内存空间如下:
可见总已提交内存大部分来自创建的平台线程,这些平台线程占用了大概613 mb
空间,它们的总线程栈空间占用约为5862 mb
,两者加起来占据总使用内存(7495 mb
)的86 %
以上。用类似的方式编写运行虚拟线程的程序:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 public class VirtualThreadFootprint { private static final int COUNT = 100000 ; public static void main (String[] args) throws Exception { for (int i = 0 ; i < COUNT; i++) { Thread.startVirtualThread(() -> { try { Thread.sleep(Long.MAX_VALUE); } catch (Exception e) { e.printStackTrace(); } }); } Thread.sleep(Long.MAX_VALUE); } }
上面的程序运行后启动10w
虚拟线程,同样通过NMT
参数和jcmd
命令查看:
这里有意缩小虚拟线程程序的最小最大堆内存为-Xms10m -Xmx100m
,程序依然正常运行,并且堆内存的实际占用量和总内存的实际占用量都不超过200 mb
,由此可以证明虚拟线程确实在极大量创建的前提下不会占据大量内存空间(这里暂时没有考虑到复杂调用情况下Continuation
栈占据内存空间大小,不过已经大幅度优于平台线程)。
5.3 局限性 当前的虚拟线程实现有如下局限性:
Continuation
栈存在native
方法或者外部函数(FFM
的API
,见JEP-424
)调用不能进行yield
操作
当持有监视器或者等待监视器的时候(一般是使用了synchronized
关键字或者Object.wait()
)不能进行yield
操作
Continuation
栈存在native
方法调用、外部函数调用或者当持有监视器或者等待监视器的时候,虚拟线程会Pin
到平台线程,导致虚拟线程无法从平台线程卸载,虽然不会影响程序正确执行,但是会影响性能,也就是如果这些虚拟线程是可复用的,永远无法切换到其运载线程,导致任务切换开销永久性增大
虚拟线程可以像平台线程一样使用ThreadLocal
,但是由于一般虚拟线程实例是会大量创建的,ThreadLocal
本质是哈希表的一个链接,创建大量哈希表会带来额外的内存开销(这一点不算局限性,更接近于开发建议,建议使用虚拟线程的时候禁用ThreadLocal )
对于前三点出现的情况,一些文档中提到会导致虚拟线程无法从运载线程卸载,这个现象称为Pinned Thread
,通过系统参数jdk.tracePinnedThreads
可以打印具体的Pinned Thread
栈,从而定位到哪些虚拟线程被固定到哪些平台线程中。对于这个问题,目前可以通过编程规范去规避,也就是虚拟线程执行的任务尽量规避调用native
方法或者外部函数,对于synchronized
关键字可以使用JUC
中的锁API
进行替换,例如ReentrantLock
等等。
5.4 使用场景 基于继承的特性,通过对java.lang.Thread
(虚拟线程的超类)薄封装,也就是基于Thread
的API
可以直接透明地实现虚拟线程的挂起和恢复等操作,对使用者屏蔽了虚拟线程复杂的调度实现。由于虚拟线程实例占据的资源比较少,可以大量地创建而无须考虑池化,因此满足类似下面的使用场景:
大批量的处理时间较短的计算任务
大量的IO
阻塞等待处理
thread-per-request
风格的应用程序,例如主流的Tomcat
线程模型或者基于类似线程模型实现的SpringMVC
框架等等
5.5 JUC亲和性 还是基于继承的特性,java.lang.VirtualThread
是java.lang.Thread
子类型,因此使用到Thread
类型的地方原则上可以透明使用VirtualThread
,就是说通过下面的形式可以池化虚拟线程 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public class VirtualThreadPool { public static void main (String[] args) throws Exception { ThreadFactory factory = Thread.ofVirtual().allowSetThreadLocals(false ) .name("VirtualFactoryWorker-" , 0 ) .inheritInheritableThreadLocals(false ) .factory(); ThreadPoolExecutor fixedVirtualThreadPool = new ThreadPoolExecutor (10 , 10 , 0 , TimeUnit.SECONDS, new LinkedBlockingQueue <>(), factory); fixedVirtualThreadPool.execute(() -> { Thread thread = Thread.currentThread(); System.out.printf("线程名称:%s,是否虚拟线程:%s\n" , thread.getName(), thread.isVirtual()); }); fixedVirtualThreadPool.shutdown(); fixedVirtualThreadPool.awaitTermination(5 , TimeUnit.SECONDS); } }
但是前面也提到过:由于虚拟线程本身是轻量级的,在执行计算任务的时候更建议每个任务新创建一个虚拟线程实例,因为池化操作本身是会引入额外开销 。另外,JUC
下很多类库都是基于AQS
数据结构实现,而AQS
中无论独占模式还是共享模式,在队列中等待的节点以及抢占虚拟头节点的对象本质都是Thread
实例,基于这一点来看,AQS
也是无缝适配VirtualThread
。见下面的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class VirtualThreadJuc { public static void main (String[] args) throws Exception { CountDownLatch latch = new CountDownLatch (1 ); Thread.startVirtualThread(() -> { try { System.out.println("before await" ); latch.await(); System.out.println("after await" ); } catch (InterruptedException e) { e.printStackTrace(); } Thread thread = Thread.currentThread(); System.out.printf("线程名称:%s,是否虚拟线程:%s\n" , thread.getName(), thread.isVirtual()); }); Thread.sleep(1000 ); System.out.println("main count down" ); latch.countDown(); Thread.sleep(Long.MAX_VALUE); } }
运行结果
1 2 3 4 before await main count down after await 线程名称:,是否虚拟线程:true
总的来说,VirtualThread
与JUC
既有类库是亲和的,大部分类库可以在虚拟线程任务中使用,并且不建议池化虚拟线程而是使用per task per virtual thread
的编程模式。
六. 小结 本文详细介绍了平台线程与虚拟线程的区别、虚拟线程实现原理、虚拟线程的源码实现以及关于虚拟线程的一些探讨,希望能够帮到读者理解Java
虚拟线程。