写点什么

Java 虚拟线程探索

  • 2025-02-08
    福建
  • 本文字数:8184 字

    阅读完需:约 27 分钟

在 Java 21 中,引入了虚拟线程,这是一个非常非常重要的特性,之前一直苦苦寻找的 Java 协程,终于问世了。在高并发以及 IO 密集型的应用中,虚拟线程能极大的提高应用的性能和吞吐量。


什么是虚拟线程


先来看一下虚拟线程的概念。


虚拟线程概念


DK 21 引入了虚拟线程的支持,这是为了改善 Java 应用程序在高并发场景下的性能。虚拟线程是一种轻量级线程,具有较小的内存占用,能够更高效地进行上下文切换,适用于 I/O 密集型的应用程序


虚拟线程的工作原理


当应用程序启动一个虚拟线程时,JVM 会将这个虚拟线程交给 JVM 底层的线程池去执行,这个底层的线程池是一个传统线程池,并且真正执行虚拟线程中任务的线程,也是传统线程(操作系统线程)。当虚拟线程遇到阻塞时,JVM 会立刻将虚拟线程挂起,让其它虚拟线程执行。也就是说,开启一个虚拟线程,并不需要启用一个传统线程,一般一个传统线程,可以执行多个虚拟线程的任务。在执行过程中,可以把虚拟线程理解成任务 task。


这里举一个列子,假设用户创建了 1000 个虚拟线程,JVM 的执行虚拟线程的线程池线程数是 10,那么当第一个虚拟线程 V1 需要执行时,JVM 会将 V1 调度到传统线程 T1 上,以此类推,虚拟线程 V2 会被调度到传统线程 T2 上,那么 V3->T3,V4->T4,… V10->T10。当执行到 V11 时,这里有三种情况:


  • 如果 V1~V10 中有任何一个线程遇到阻塞,我们这里假设 V3 遇到阻塞,那么 JVM 会将 V3 挂起,此时 T3 线程可用,那么 V11 被 T3 执行。

  • 如果 V1~V10 没有线程被阻塞,那么 JVM 根据划分的时间片,假设每个虚拟线程允许执行 100ns,那么过了 100ns 后,这里 V1 最新执行,JVM 则将 V1 挂起,让 T1 去执行 V11。

  • 如果以上两种情况都不满足,那么先将 V11 挂起,等待有可用的传统线程时,再执行 V11。


对于被阻塞的线程,如 V3,当 IO 结束后,操作系统会通过事件,如 epoll 通知 JVM,V3 的 IO 操作已结束,此时 JVM 重新唤醒 V3,选择可用的传统线程,来执行 V3 的任务。


这里需要注意两点:


  • 虚拟线程 IO 执行完成后,会通过操作系统的事件通知机制,如 epoll 来通知 JVM。这一点对于虚拟线程的高效调度至关重要,因为它确保了 阻塞的 I/O 操作 不会占用操作系统线程的时间片,避免了传统线程池的高资源消耗和效率低下。。

  • JVM 在对虚拟线程进行上下文切换时,因为不涉及到操作系统级别的线程上下文切换,代价非常低,速度也非常快。


虚拟线程的调度


一般来说,程序员不需要对虚拟线程的调度进行管理,在 JDK 21 中,JVM 默认启用了虚拟线程,并且会使用默认的 ForkJoinPool 线程池来执行虚拟线程,并且线程池的大小,也会根据虚拟线程的数量,进行动态调整。如果需要手动管理执行虚拟线程的线程池大小,那么需要自定义线程池,并将虚拟线程交给自定义的线程池来执行,这样虽然可行,通常没有必要。


虚拟线程与传统线程区别


虚拟线程与传统线程的区别主要在于:


  • 创建虚拟线程时,JVM 不会创建一个操作系统线程,创建一个传统线程时,JVM 会创建一个操作系统线程。一个传统线程,可以轮询执行多个虚拟线程。

  • 虚拟线程是由传统线程来执行的,虚拟线程的调度由 JVM 控制,传统线程的执行和调度,由操作系统来控制。

  • 虚拟线程的上下文切换是由 JVM 控制的,因为不涉及到操作系统级别线程的上下文切换,虚拟线程上下文切换速度非常快,可以满足高并发需求。

  • 创建一个虚拟线程占用的内存非常小,相对而言,创建一个传统线程,占用的内存空间大。在应用中,可以创建大量的虚拟线程,一般支持到百万级,而创建传统线程,一般只能到几千,我们一般也不建议创建这么多传统线程。


虚拟线程类似于 task,传统系统与操作系统线程对应,一个传统线程可以执行多个虚拟线程。虚拟线程与 task 的区别是,当传统线程执行虚拟线程时,遇到阻塞会挂起虚拟线程,当传统线程执行 task 时,遇到阻塞就真的阻塞了。当然传统中的 task 继承自 runnable,虚拟线程继承自 Thread,他们属于不同的类,可调用的方法也不一样。


JDK 也提供了虚拟线程池,可以通过下面方式得到一个虚拟线程池。


import java.util.concurrent.*;
public class VirtualThreadPoolExample { public static void main(String[] args) { // 创建一个虚拟线程池 ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
// 提交多个任务到线程池 for (int i = 0; i < 10; i++) { final int taskId = i; executor.submit(() -> { System.out.println("Task " + taskId + " running in " + Thread.currentThread()); }); }
// 关闭线程池 executor.shutdown(); }}
复制代码


上面代码中,提交给线程池的任务,JVM 都会为其创建一个虚拟线程,然后以虚拟线程的方式执行。

与传统的线程池相比,虚拟线程池无法设置核心线程数、最大线程数、线程池大小、任务队列等参数,也不需要设置这些参数。


虚拟线程与传统线程的相同之处:


  • 他们都继承自 Thread,用法一摸一样。也都支持线程池。

  • 与传统一样,虚拟线程也有 new,runnable,waiting,blocked,terminated 等状态。

  • 所有的锁,同步机制,对虚拟线程都适用,并且与传统线程一样,虚拟线程也会有资源争夺以及状态同步问题。并且也有上下文切换,虽然虚拟线程的上下文切换,代价非常小。

  • 异常处理机制一样,如果遇到异常不处理,虚拟线程也会终止执行。


虚拟线程与协程的区别


协程是 python 中的异步编程技术,对于 IO 密集型应用,协程可以发挥很大的优势。协程的异步工作原理与虚拟线程相似,也是遇到 IO 就阻塞,让主线程继续执行其它任务,当 IO 完成时,操作系统通过事件机制,如 epoll,通知 python 进程,产生一个事件,放到 event loop 队列中,最后由主线程执行。


虚拟线程与协程的主要区别在于:



怎样使用虚拟线程


在 JDK 21 中,使用虚拟线程有两种方式:


  • 直接创建并启动虚拟线程。


public class VirtualThreadExample {    public static void main(String[] args) {        Thread virtualThread = Thread.ofVirtual().start(() -> {            System.out.println("Hello virtual thread ");        });
try { virtualThread.join(); // 等待虚拟线程完成 } catch (InterruptedException e) { e.printStackTrace(); } }}
复制代码


  • 通过线程池执行虚拟线程。


import java.util.concurrent.*;
public class VirtualThreadPoolExample { public static void main(String[] args) { // 创建一个虚拟线程池 ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
// 提交多个任务到线程池 for (int i = 0; i < 10; i++) { final int taskId = i; executor.submit(() -> { System.out.println("Task " + taskId + " running in " + Thread.currentThread()); }); }
// 关闭线程池 executor.shutdown(); }}
复制代码


通过线程池执行任务时,无法对并发实现控制,容易造成 OOM,或耗尽服务方资源,可以自定义以下虚拟线程池,实现资源控制:


package com.zengbiaobiao.demo.vitrualthreaddemo;
import org.springframework.lang.NonNull;
import java.util.ArrayList;import java.util.List;import java.util.concurrent.*;
/***** * 虚拟线程池,支持配置任务队列数和最大并发任务数 */public class VirtualThreadExecutorService extends AbstractExecutorService {
private volatile boolean shouldStop = false;
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor(); private final Semaphore semaphore; private final BlockingQueue<Runnable> taskQueue;
/****** * 构造函数 * @param taskQueueSize,任务队列大小,任务队列是一个阻塞队列,如果任务队列满了,那么调用execute方法会阻塞 * @param concurrencySize,并发任务大小,同时执行的IO任务个数,防止并发过重,或者资源不够 */ public VirtualThreadExecutorService(int taskQueueSize, int concurrencySize) { this.semaphore = new Semaphore(concurrencySize); taskQueue = new LinkedBlockingQueue<>(taskQueueSize); this.loopEvent(); }
private void loopEvent() { Thread.ofVirtual().name("VirtualThreadExecutor").start(() -> { while (!shouldStop) { try { Runnable task = taskQueue.take(); semaphore.acquire(); executor.execute(() -> { try { try { task.run(); } finally { semaphore.release(); } } catch (Exception e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } }); } catch (InterruptedException e) { Thread.currentThread().interrupt(); if (shouldStop) break; } } }); }
@Override public void shutdown() { shouldStop = true; executor.shutdown(); }
/** * @return The task not executed */ @Override public List<Runnable> shutdownNow() { shouldStop = true; List<Runnable> remainingTasks = new ArrayList<>(taskQueue); taskQueue.clear(); executor.shutdownNow(); return remainingTasks; }
@Override public boolean isShutdown() { return shouldStop; }
@Override public boolean isTerminated() { return shouldStop && executor.isTerminated(); }
@Override public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return executor.awaitTermination(timeout, unit); }
@Override public void execute(Runnable command) { try { taskQueue.put(command); // 阻塞直到队列有空间 } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RejectedExecutionException("Task submission interrupted.", e); } }}
复制代码


测试代码如下:


package com.zengbiaobiao.demo.vitrualthreaddemo;
import org.apache.tomcat.util.threads.VirtualThreadExecutor;
public class VirtualThreadExecutorServiceDemo { public static void main(String[] args) throws InterruptedException {
VirtualThreadExecutorService executorService = new VirtualThreadExecutorService(10, 2);

for (int i = 0; i < 100000; i++) { final String threadName = "thread-" + i; System.out.println(Thread.currentThread() + ": try to create task " + threadName); executorService.submit(() -> { System.out.println(Thread.currentThread() + ": " + threadName + " created!"); try { Thread.sleep(2000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println(Thread.currentThread() + ": " + threadName + " finished!"); }); }
Thread.sleep(5000000);
}}
复制代码


哪些场景下可以应用虚拟线程


虚拟线程在 IO 密集型的高并发应用中能发挥出巨大的威力,在所有 IO 密集型应用中,具体来说,下列场景中,使用虚拟线程是比较合适的:


  • 短时间需要完成的任务,且没有资源争夺或乱序问题,比如数据库写入,服务器 HTTP 请求处理,远程 RESTful API 调用,RabbitMQ 消息处理等应用场景。。

  • 长时间运行的任务,但是对消息处理由顺序要求的任务。比如在电梯监控系统中,需要对每台电梯的数据进行处理,但是需要保证消息被处理的顺序。这时可以为每台电梯创建一个虚拟线程,这台电梯的数据交给专门的虚拟线程处理。因为应用中可以创建大量虚拟线程,并且虚拟线程一般都是异步处理任务,所以这个场景中,使用虚拟线程,可以满足高性能和高并发的要求。

  • API 网关中,对多个上游 API 数据进行查询,组装合并,使用虚拟线程,相比传统线程,效果更佳。虚拟线程,也支持 CountDownLatch,Semaphore 等工具类。

  • 事件驱动的架构中,使用虚拟线程,效果也很好。比如 spring boot 中的异步事件,默认使用的是传统线程池,如果将其改成虚拟线程池,并发处理能力可以极大提高。


那么哪些场景下不合适使用虚拟线程呢?


  • CPU 密集型应用,比如大数据处理、图像处理、矩阵运算等。

  • 如果应用有很高的并发资源争夺,或者状态同步,并且造成系统吞吐量低,需要考虑优化并发模型,这种场景下,不但传统线程不合适,虚拟线程也不合适。


虚拟线程实际应用场景举例


在一个 spring boot 项目中,有时候因为异步事件处理不过来,造成吞吐量下降,在 JDK 21 中,可以将事件改成虚拟线程来执行,代码如下:


package com.zengbiaobiao.demo.vitrualthreaddemo;
import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.scheduling.annotation.EnableAsync;
import java.util.concurrent.Executor;import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;import java.util.concurrent.Semaphore;
@Configuration@EnableAsyncpublic class AsyncConfig {
@Bean(name = "taskExecutor") public Executor taskExecutor() { // 最大并行任务数 Semaphore semaphore = new Semaphore(100); ExecutorService virtualThreadPool = Executors.newVirtualThreadPerTaskExecutor();
return runnable -> { try { // 控制并行任务数 semaphore.acquire(); virtualThreadPool.submit(() -> { try { runnable.run(); } finally { semaphore.release(); } }); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Task submission interrupted", e); } }; }}
复制代码


事件发送和处理代码如下:


package com.zengbiaobiao.demo.vitrualthreaddemo;
import org.springframework.context.ApplicationEventPublisher;import org.springframework.context.event.EventListener;import org.springframework.scheduling.annotation.Async;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;
@RestController@RequestMapping("/home")public class HomeController {
private final ApplicationEventPublisher eventPublisher;
public HomeController(ApplicationEventPublisher eventPublisher) { this.eventPublisher = eventPublisher; }
@GetMapping("/index") public String index() { for (int i = 0; i < 1000; i++) { eventPublisher.publishEvent("event " + i); } return "success"; }
@EventListener @Async public void handleEvent(String event) { System.out.println(Thread.currentThread() + ": " + event); try { Thread.sleep(100); } catch (InterruptedException e) { throw new RuntimeException(e); } }}
复制代码


输出结果如下:


VirtualThread[#2031]/runnable@ForkJoinPool-1-worker-4: event 976VirtualThread[#2039]/runnable@ForkJoinPool-1-worker-1: event 980VirtualThread[#1064]/runnable@ForkJoinPool-1-worker-1: event 983VirtualThread[#2047]/runnable@ForkJoinPool-1-worker-2: event 984VirtualThread[#2049]/runnable@ForkJoinPool-1-worker-9: event 985VirtualThread[#2057]/runnable@ForkJoinPool-1-worker-2: event 989VirtualThread[#2059]/runnable@ForkJoinPool-1-worker-3: event 990VirtualThread[#2061]/runnable@ForkJoinPool-1-worker-6: event 991VirtualThread[#2063]/runnable@ForkJoinPool-1-worker-10: event 992VirtualThread[#2065]/runnable@ForkJoinPool-1-worker-10: event 993VirtualThread[#2071]/runnable@ForkJoinPool-1-worker-3: event 996VirtualThread[#2069]/runnable@ForkJoinPool-1-worker-2: event 995VirtualThread[#2075]/runnable@ForkJoinPool-1-worker-7: event 998VirtualThread[#2077]/runnable@ForkJoinPool-1-worker-10: event 999
复制代码


上面输出结果中,每次并发执行 100 个任务,当虚拟线程池任务达到 100 之后,执行 eventPublisher.publishEvent("event " + i)代码时,代码阻塞,过 100ms 之后,100 个任务执行完成,下一批任务被执行。


虚拟线程使用注意事项


  • 搞清楚任务类型,是 IO 密集型,还是 CPU 密集型

  • 与传统线程结合使用

  • 关注性能和资源,使用虚拟线程无法通过线程池等工具控制并发,需要借助 Semepha,CountdownLatch 等工具才能限流,如果不限流,容易造成 OOM,或对目标系统造成巨大流量冲击。

  • 在异步框架中,关注隐藏的传统线程,比如在 HttpClient 的异步请求中,每次异步请求都会创建一个 HttpClient 回调线程。大量的传统线程被间接创建,也容易引起 OOM。

  • 由 synchronized 关键字引起的 pinned 问题,看起来在 JDK 21 中,做了一些优化,即便虚拟线程 pinned 到传统线程,也只是性能退回到传统线程,无非是慢一点,反而不是太大问题。经过大量测试,发现基本只出现一次,之后不会再出现。不过使用 ReentrantLock,效果确实会好很多,将 synchronized 关键字改成 lock.()和 lock.unlock(),ForkJoinPool 中的线程数量会降低,并且任务分配均衡。

  • 不要忽略软件设计,尤其在需要大量同步的应用中。


经过验证,虚拟线程在遇到 IO 时,确实会让步,并且不消耗太多资源,核心特点是,让异步编程变得简单,并且不需要框架支持。但是容易因大的并发,造成 OOM,或者对目标系统造成冲击,追求高并发可用,但一定要做测试和验证。对于需要做状态同步,如需要加锁,或需要使用 synchronize 关键字的代码,需要优化设计,如果无法规避,那么,使用虚拟线程,和使用线程池,效果差不多。


虚拟线程存在的问题:

Java Virtual Threads — some early gotchas to look out for

Two Pitfalls by moving to Java Virtual Threads

Java 21 Virtual Threads - Dude, Where’s My Lock?

Pitfalls to avoid when switching to Virtual threads

Do Java 21 virtual threads address the main reason to switch to reactive single-thread frameworks?

Pinning: A pitfall to avoid when using virtual threads in Java

Taming the Virtual Threads: Embracing Concurrency With Pitfall Avoidance

Pitfalls you encounter with virtual threads


文章转载自:曾彪彪

原文链接:https://www.cnblogs.com/zengbiaobiao2016/p/18702338

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
Java虚拟线程探索_Java_快乐非自愿限量之名_InfoQ写作社区