写点什么

JUC 并发—Thread 源码分析及案例应用

  • 2025-02-14
    福建
  • 本文字数:21085 字

    阅读完需:约 69 分钟

1.什么是线程以及并发编程


(1)线程是什么


public class HelloWorld {    public static void main(String[] args) {        System.out.println("Hello World......");        new Thread() {            public void run() {                System.out.println("另外一个线程干的事儿......");            }        }.start();        //还有一些其他的代码    }}
复制代码


如果要执行这段代码,可以在 IDEA 中直接运行这个 main 方法,IDEA 在执行上述的 main 方法相当于启动一个 JVM 进程。

 

JVM 进程里会有很多线程,首先第一个线程就是 main 线程,main 线程就是负责执行 main 方法里的那些代码。只要 main 线程执行完这些代码后,JVM 进程默认就会直接退出。

 

除了在 IDEA 直接运行一个 main 方法可以启动一个 JVM 进程外,也可通过 java -jar 命令启动一个 JVM 进程(此时需指定执行一个 main 方法)。

 

(2)多线程并发的本质


一个 JVM 进程除了会默认开启 main 线程,还能在 main 线程开启其他线程。比如可通过 Thread 类开启别的线程,别的线程是和 main 线程同时运行的。

 

这些线程没有先后顺序,多线程并发运行的时候,本质是 CPU 在执行各个线程的代码。一个 CPU 会有一个时间片算法,一会儿执行 main 线程、一会儿执行 Thread 线程,看起来两个线程同时运行。只不过 CPU 执行每个线程的时间特别短,可能执行一次就几微妙,所以看起来好像是多个线程在并发执行一样。

 

(3)多线程的基本原理


当执行 start()方法启动线程时,会先在 JVM 层面创建一个线程,JVM 会根据当前操作系统的类型调用相关指令来创建线程并启动。

 

线程启动后,并不会立即运行,而是要等到操作系统的 CPU 调度算法,把当前线程分配给某个 CPU 才执行。线程被分配执行后,会回调线程中的 run()方法执行相关指令。

 

(4)线程的运行状态


New(新建状态):

调用 new Thread()时的状态;

 

Runnable(运行状态):

调用 start()方法启动线程后的状态;

 

Blocked(阻塞状态):

线程执行 synchronized 代码但未抢到锁的状态;

 

Waiting(等待状态):

调用 Object.wait()等方法时的状态;

 

Timed_Waiting(等待超时状态):

调用 sleep(timeout)时的状态;

 

Terminated(终止状态):

线程的 run()方法执行完后的状态;

 

(5)线程上下文切换


一.什么是线程上下文切换


由于同时运行的线程数是由 CPU 核数来决定的,所以为了支持更多线程运行,CPU 会把自己的时间片轮流分配给其他线程,这个过程就是上下文切换。

 

二.导致线程上下文切换的原因


多个任务抢占 synchronized 同步锁,线程运行遇到 IO 阻塞时 CPU 调度器会切换 CPU 时间片。在线程中通过主动阻塞当前线程的方法释放 CPU 时间片,当前线程执行完成后释放 CPU 时间片。

 

三.如何减少线程上下文切换


方法一:减少线程数

方法二:采用无锁设计解决线程竞争问题

方法三:采用 CAS 做自旋操作

 

(6)并发编程涉及的内容


一.控制多线程实现系统功能

二.Java 内存模型以及 volatile 关键字

三.线程同步以及通信

四.锁优化

五.并发包以及线程池

 

2.微服务注册中心案例


(1)并发编程问题下的案例


并发编程一般用在底层分布式系统、底层分布式中间件中。比如微服务注册中心、大数据分布式系统,就大量使用了并发编程的技术。比如一些中间件 Dubbo、RocketMQ,也会大量使用并发编程的技术。

 

可以通过介绍两个简单案例,来说明应该如何在系统底层使用并发编程。比如微服务注册中心系统的案例、分布式存储系统的案例。其中 HDFS 的 edits log 机制就会通过 volatile 关键字来控制一个读写开关。

 

(2)微服务注册中心的案例


在一个微服务系统中,会有大量的微服务,各个服务之间会产生互相调用。但一开始服务间是不知道对方的位置的,这时就需要一个微服务注册中心。各个服务都把自己的地址注册到微服务注册中心,包括机器、端口等。然后其他服务就可以从注册中心拉取各服务信息,感知到其他服务的存在。

 

微服务注册中心会在内存里维护这些各服务注册的数据,可以理解为注册表。于是当服务注册和下线时,就会写微服务注册中心的这个注册表。同时,各个服务也会不时读取微服务注册中心的这个注册表的信息。所以这个注册表就会存在多线程并发读写冲突的问题,这时就会涉及相关的并发编程、锁冲突、锁优化的问题。


 

3.以工作线程模式开启微服务的注册和心跳线程


(1)创建和启动一个线程的主要方法


继承 Thread 类、实现 Runnable 接口、使用 ExecutorService 线程池、使用 Callable/Future 实现带有返回值的多线程。


//1.扩展Thread类new Thread() {    @Override    public void run() {                }}.start();
public class MyThread extends Thread { @Override public void run() { }}new MyThread().start();
//2.实现Runnablenew Thread(new Runnable() { @Override public void run() { }}).start();
public class MyRunnable implements Runnable { public void run() { }}new Thread(new MyRunnable()).start();
复制代码


(2)微服务注册中心的两个工程


一.register-server 负责接收各个服务的请求


register-server 是可以独立部署和启动的,启动之后会接收和处理各个服务发送过来的请求,进行注册、心跳、下线。

 

可以认为 register-server 有一个 main 方法,直接运行该 main 方法就会启动。类似 Spring Boot,直接运行其 main 方法就会启动一个 Web 服务器。

 

二.register-client 是一个依赖包,各个服务需要引入这个依赖


各服务向微服务注册中心进行注册时,可以开启一个线程去执行。各服务向微服务注册中心发送心跳时,也可以开启一个线程去发送。

 

从而让服务启动时,就可以通过 register-client 这个依赖包,向 register-server 完成服务注册和心跳通知。

 

(3)工作线程 RegisterClientWorker 的实现


引入了 register-client 依赖包的服务,在启动 JVM 进程后,会有一个 main 线程,以及一个 RegisterClientWorker 线程。

 

main 线程会负责启动 RegisterClientWorker 线程,但 main 线程结束了以后,JVM 进程不会退出。因为有一个工作线程 RegisterClientWorker 会一直在运行,只要工作线程一直在运行没有结束,那么 JVM 进程是不会退出的。

 

下面是 register-client 的代码,其中 RegisterClientWorker 可以做成一个内部类。因为从面向对象角度来说,RegisterClientWorker 只属于一个 RegisterClient。


//在服务上被创建和启动,负责跟register-server进行通信public class RegisterClient {    //服务实例id    private String serviceInstanceId;      public RegisterClient() {        this.serviceInstanceId = UUID.randomUUID().toString().replace("-", "");    }      public void start() {        //一旦启动了这个register-client组件之后,它就负责在服务上做两件事情        //第一件事情,就是开启一个线程向register-server去发送请求,注册这个服务        //第二件事情,就是在注册成功之后,开启另外一个线程去发送心跳请求              //可以简化一下这个模型:        //这个线程刚启动时,首先要做的事情就是完成注册        //完成注册后,就会进入一个while true死循环        //然后每隔30秒就发送一个请求去进行心跳        new RegisterClientWorker(serviceInstanceId).start();    }}
//通过内部类实现,负责向register-server发起注册申请的线程private class RegisterClientWorker extends Thread { public static final String SERVICE_NAME = "inventory-service"; public static final String IP = "192.168.31.207"; public static final String HOSTNAME = "inventory01"; public static final int PORT = 9000; //http通信组件 private HttpSender httpSender; //是否完成服务注册 private Boolean finishedRegister; //服务实例id private String serviceInstanceId;
public RegisterClientWorker(String serviceInstanceId) { this.httpSender = new HttpSender(); this.finishedRegister = false; this.serviceInstanceId = serviceInstanceId; }
@Override public void run() { if (!finishedRegister) { //获取当前机器的信息,包括当前机器的ip地址、hostname,以及你配置这个服务监听的端口号 RegisterRequest registerRequest = newRegisterRequest(); registerRequest.setServiceName(SERVICE_NAME); registerRequest.setIp(IP); registerRequest.setHostname(HOSTNAME); registerRequest.setPort(PORT); registerRequest.setServiceInstanceId(serviceInstanceId); RegisterResponse registerResponse = httpSender.register(registerRequest); System.out.println("服务注册的结果是:" + registerResponse.getStatus() + "......"); //如果注册成功 if (RegisterResponse.SUCCESS.equals(registerResponse.getStatus())) { finishedRegister = true; } else { return; } } //如果注册成功,就进入while true死循环 if (finishedRegister) { HeartbeatRequest heartbeatRequest = new HeartbeatRequest(); heartbeatRequest.setServiceInstanceId(serviceInstanceId); HeartbeatResponse heartbeatResponse = null; while(true) { try { heartbeatResponse = httpSender.heartbeat(heartbeatRequest); System.out.println("心跳的结果为:" + heartbeatResponse.getStatus() + "......"); Thread.sleep(30 * 1000); } catch (Exception e) { e.printStackTrace(); } } } }}
//负责发送各种http请求的组件public class HttpSender { //发送注册请求 public RegisterResponse register(RegisterRequest request) { //基于类似HttpClient这种开源的网络包构造一个请求 //里面放入这个服务实例的信息,比如服务名称,ip地址,端口号,然后通过这个请求发送过去 System.out.println("服务实例【" + request + "】,发送请求进行注册......"); //收到register-server响应之后,封装一个Response对象 RegisterResponse response = new RegisterResponse(); response.setStatus(RegisterResponse.SUCCESS); return response; } //发送心跳请求 public HeartbeatResponse heartbeat(HeartbeatRequest request) { System.out.println("服务实例【" + request.getServiceInstanceId() + "】,发送请求进行心跳......"); HeartbeatResponse response = new HeartbeatResponse(); response.setStatus(RegisterResponse.SUCCESS); return response; }}
//register-client组件的测试类//在别的服务启动时,会引入RegisterClient依赖包,需要对RegisterClient进行实例化public class RegisterClientTest { public static void main(String[] args) { RegisterClient registerClient = new RegisterClient(); registerClient.start(); }}
复制代码


4.微服务注册中心的服务注册功能


(1)微服务注册中心扫描微服务存活的线程


各个服务向微服务注册中心发起注册请求时,微服务注册中心会将各服务的信息保存到内存的一个注册表里,然后各服务在运行过程中会不断发送心跳通知微服务注册中心自己还活着。

 

所以微服务注册中心需要开启一个后台线程,不断地扫描和监控注册表里各个服务的心跳发送时间。如果发现某服务超过一定时间没有发送心跳,那么将该服务从注册表中摘除。

 

(2)开启线程的两种常见方法


一是继承 Thread 类并重写 run()方法,然后创建实例执行 start()方法启动线程;

二是实现 Runnable 接口并封装运行逻辑,然后传入 Thread 类的构造方法中;

 

(3)微服务注册中心 register-server 的实现


register-server 一般以 Web 工程的模式来启动,它会类似于 Spring Web MVC 提供一些 HTTP 接口,让各个服务的 register-client 组件可以发送 HTTP 请求过来并进行处理。并且每隔 60 秒检查某个服务是否已经超过 90 秒都没有更新心跳并续约,如果是那么就认为该服务已经不存活了,需要摘除这个服务实例。当然微服务注册中心也可以通过 RPC 的模式来实现。

 

一.核心的内存数据结构之注册表


Map:key 是服务实例 ID,value 是服务实例信息。key 是服务名称,比如库存服务。value 是这个服务的所有的服务实例,比如提供库存服务的 2 台机器。


//注册表public class Registry {    //注册表是一个单例    private static Registry instance = new Registry();
private Registry() { }
public static Registry getInstance() { return instance; }
//核心的内存数据结构:注册表 //Map<String, ServiceInstance>:key是服务名称,比如库存服务,value是这个服务的所有的服务实例,比如提供库存服务的2台机器 private Map<String, Map<String, ServiceInstance>> registry = new HashMap<String, Map<String, ServiceInstance>>(); //服务注册 public void register(ServiceInstance serviceInstance) { Map<String, ServiceInstance> serviceInstanceMap = registry.get(serviceInstance.getServiceName()); if (serviceInstanceMap == null) { serviceInstanceMap = new HashMap<String, ServiceInstance>(); registry.put(serviceInstance.getServiceName(), serviceInstanceMap); } serviceInstanceMap.put(serviceInstance.getServiceInstanceId(), serviceInstance); System.out.println("服务实例【" + serviceInstance + "】,完成注册......"); System.out.println("注册表:" + registry); }
//获取服务实例 public ServiceInstance getServiceInstance(String serviceName, String serviceInstanceId) { Map<String, ServiceInstance> serviceInstanceMap = registry.get(serviceName); return serviceInstanceMap.get(serviceInstanceId); }
//获取整个注册表 public Map<String, Map<String, ServiceInstance>> getRegistry() { return registry; }
//从注册表删除一个服务实例 public void remove(String serviceName, String serviceInstanceId) { System.out.println("服务实例【" + serviceInstanceId + "】,从注册表中进行摘除"); Map<String, ServiceInstance> serviceInstanceMap = registry.get(serviceName); serviceInstanceMap.remove(serviceInstanceId); }}
复制代码


二.负责接收 register-client 请求的 Controller


//这个Controller是负责接收register-client发送过来的请求的public class RegisterServerController {    private Registry registry = Registry.getInstance();    		//服务注册    public RegisterResponse register(RegisterRequest registerRequest) {        RegisterResponse registerResponse = new RegisterResponse();        try {            ServiceInstance serviceInstance = new ServiceInstance();            serviceInstance.setHostname(registerRequest.getHostname());            serviceInstance.setIp(registerRequest.getIp());            serviceInstance.setPort(registerRequest.getPort());            serviceInstance.setServiceInstanceId(registerRequest.getServiceInstanceId());            serviceInstance.setServiceName(registerRequest.getServiceName());            registry.register(serviceInstance);            registerResponse.setStatus(RegisterResponse.SUCCESS);        } catch (Exception e) {            e.printStackTrace();            registerResponse.setStatus(RegisterResponse.FAILURE);        }        return registerResponse;    }}
复制代码


三.微服务注册中心的后台处理


//微服务注册中心后台处理public class RegisterServer {    public static void main(String[] args) throws Exception {        RegisterServerController controller = new RegisterServerController();        String serviceInstanceId = UUID.randomUUID().toString().replace("-", "");        //模拟发起一个服务注册的请求        RegisterRequest registerRequest = new RegisterRequest();        registerRequest.setHostname("inventory-service-01");        registerRequest.setIp("192.168.31.208");        registerRequest.setPort(9000);        registerRequest.setServiceInstanceId(serviceInstanceId);        registerRequest.setServiceName("inventory-service");        controller.register(registerRequest);        //模拟进行一次心跳,完成续约        ...        //开启一个后台线程,检测微服务的存活状态        ...        while(true) {            Thread.sleep(30 * 1000);        }    }}
复制代码


5.微服务注册中心的心跳续约功能


(1)处理 register-client 发送过来的心跳请求


只要 register-server 处理了 register-client 发送过来的一次心跳,那么就相当于维护了 register-client 和 register-server 之间的一份契约,也就是维持了 register-client 在 register-server 中还存活的状态。分布式系统中大都有心跳续约的机制。


public class RegisterServerController {    private Registry registry = Registry.getInstance();
//处理register-client发送过来的心跳请求 public HeartbeatResponse heartbeat(HeartbeatRequest heartbeatRequest) { HeartbeatResponse heartbeatResponse = new HeartbeatResponse(); try { ServiceInstance serviceInstance = registry.getServiceInstance(heartbeatRequest.getServiceName(), heartbeatRequest.getServiceInstanceId()); serviceInstance.renew(); heartbeatResponse.setStatus(HeartbeatResponse.SUCCESS); } catch (Exception e) { e.printStackTrace(); heartbeatResponse.setStatus(HeartbeatResponse.FAILURE); } return heartbeatResponse; }}
复制代码


(2)包含契约信息的微服务实例


下面是 ServiceInstance 服务实例,包含了契约信息。从面向对象的角度出发,一个契约一定是属于一个服务实例的。契约不可能独立存在,所以契约做成内部类比较好。


//ServiceInstance代表了一个微服务实例//里面包含了一个服务实例的所有信息,比如服务名称、ip地址、hostname、端口号、服务实例id、契约信息(Lease)public class ServiceInstance {    //判断一个服务实例不再存活的周期    private static final Long NOT_ALIVE_PERIOD = 90 * 1000L;    //服务名称    private String serviceName;    //ip地址    private String ip;    //主机名    private String hostname;    //端口号    private int port;    //服务实例id    private String serviceInstanceId;    //契约    private Lease lease;       public ServiceInstance() {        this.lease = new Lease();    }   
//服务续约 public void renew() { this.lease.renew(); }
//服务实例是否存活 public Boolean isAlive() { return this.lease.isAlive(); } //Lease代表的是契约,做成内部类 //维护了一个服务实例跟当前的这个注册中心之间的联系,包括了心跳的时间,创建的时间等 private class Lease { //最近一次心跳的时间 private Long latestHeartbeatTime = System.currentTimeMillis(); //续约,只要处理一次心跳请求,就相当于维护了register-client和register-server之间的契约 public void renew() { this.latestHeartbeatTime = System.currentTimeMillis(); System.out.println("服务实例[" + serviceInstanceId + "],进行续约:" + latestHeartbeatTime); } //判断当前服务实例的契约是否还存活 public Boolean isAlive() { Long currentTime = System.currentTimeMillis(); if (currentTime - latestHeartbeatTime > NOT_ALIVE_PERIOD) { System.out.println("服务实例[" + serviceInstanceId + "],不再存活"); return false; } System.out.println("服务实例[" + serviceInstanceId + "],保持存活"); return true; } } ...}
复制代码


(3)模拟处理一次心跳请求完成续约


//微服务注册中心的后台处理public class RegisterServer {       public static void main(String[] args) throws Exception {        RegisterServerController controller = new RegisterServerController();        String serviceInstanceId = UUID.randomUUID().toString().replace("-", "");        //模拟发起一个微服务注册的请求        ...        //模拟发起一次微服务的心跳请求,以完成续约        HeartbeatRequest heartbeatRequest = new HeartbeatRequest();        heartbeatRequest.setServiceName("inventory-service");          heartbeatRequest.setServiceInstanceId(serviceInstanceId);         controller.heartbeat(heartbeatRequest);        //开启一个后台线程,检测微服务的存活状态        ...        while(true) {            Thread.sleep(30 * 1000);          }    }}
复制代码


6.微服务的存活状态监控线程


(1)监控微服务是否存活的线程的实现


//微服务存活状态监控组件public class ServiceAliveMonitor {    //检查服务实例是否存活的间隔    private static final Long CHECK_ALIVE_INTERVAL = 60 * 1000L;    //负责监控微服务存活状态的后台线程    private Daemon daemon = new Daemon();
//启动后台线程 public void start() { daemon.start(); }
//负责监控微服务存活状态的后台线程 private class Daemon extends Thread { private Registry registry = Registry.getInstance(); @Override public void run() { Map<String, Map<String, ServiceInstance>> registryMap = null; while(true) { try { registryMap = registry.getRegistry(); for (String serviceName : registryMap.keySet()) { Map<String, ServiceInstance> serviceInstanceMap = registryMap.get(serviceName); for (ServiceInstance serviceInstance : serviceInstanceMap.values()) { //如果服务实例距离上一次发送心跳已经超过90秒了,则认为这个服务死了,需要从注册表中摘除这个服务实例 if (!serviceInstance.isAlive()) { registry.remove(serviceName, serviceInstance.getServiceInstanceId()); } } } Thread.sleep(CHECK_ALIVE_INTERVAL); } catch (Exception e) { e.printStackTrace(); } } } }}
复制代码


(2)开启一个后台线程来检测微服务的存活状态


//微服务注册中心的后台处理public class RegisterServer {       public static void main(String[] args) throws Exception {        ...        //模拟发起一个微服务注册的请求        ...        //模拟微服务发起一次心跳请求,来完成续约        ...        //开启一个后台线程,检测微服务的存活状态        ServiceAliveMonitor serviceAliveMonitor = new ServiceAliveMonitor();        serviceAliveMonitor.start();        while(true) {            Thread.sleep(30 * 1000);          }    }}
复制代码


7.以 daemon 模式运行微服务的存活监控线程


(1)非 daemon 线程和 daemon 线程


一.非 daemon 线程(工作线程)


一般来说,工作线程就是非 daemon 线程,后台线程是 daemon 线程。默认创建的线程就是非 daemon 的,称之为工作线程。

 

如果 main()方法启动后要开启几个无限循环工作的线程来处理一些请求,比如类似于 Web 服务器的,那么这些无限循环工作的线程就是工作线程。main 方法执行完后,因工作线程一直在运行,所以 JVM 进程不会退出。

 

二.daemon 线程


daemon 线程指的是:当工作线程都停止时,而自动退出的线程。比如 main 线程都执行完了,那么 daemon 线程会跟着 JVM 进程一起退出。daemon 线程不会像工作线程一样阻止 JVM 进程退出。

 

(2)设置监控微服务存活的线程为 daemon 线程


比如当微服务注册中心负责接收请求的核心工作线程,由于某些原因停止后,那么这个微服务注册中心必须停止。但如果监控微服务存活状态的线程一直在 while 循环中运行着,那么会导致微服务注册中心无法停止,因为 JVM 进程没法结束。

 

所以针对这种情况,一般会将后台运行的线程设置为 daemon 线程。如果 JVM 只剩 daemon 线程,就会销毁所有 daemon 线程,并退出 JVM 进程。

 

所以应该将监控微服务存活状态的线程设置为 daemon 线程,这样如果微服务注册中心的工作线程挂了,监控存活状态的线程也能被销毁。


public class ServiceAliveMonitor {    ...    //负责监控微服务存活状态的后台线程    private Daemon daemon;
public ServiceAliveMonitor() { this.daemon = new Daemon(); //只要设置了这个标志位,就代表这个线程是一个daemon线程(后台线程) //非daemon线程一般称为工作线程,如果工作线程(main线程)都结束了, daemon线程是不会阻止JVM进程退出的 //daemon线程会跟着JVM进程一起退出 daemon.setDaemon(true); daemon.setName("serviceAliveMonitor"); } ...}

public class RegisterServer { public static void main(String[] args) throws Exception { ... //开启一个后台线程,检测微服务的存活状态 ServiceAliveMonitor serviceAliveMonitor = new ServiceAliveMonitor(); serviceAliveMonitor.start();
//register-server当然不会只有一个main线程作为工作线程 //register-server一般作为一个Web工程部署在一个Web服务器里 //register-server最核心的工作线程,就是接收和处理register-client发送过来的请求 //正常来说,只要有工作线程,register-server的JVM是不会随便退出的 //如果register-server的工作线程都停止了,那么daemon线程就会跟着JVM进程一起退出
//目前案例里没有添加网络的实现,暂时在main方法里使用while(true),以便让main线程(工作线程)不要退出 //这样就可以保证服务一直运行, daemon线程(监控线程)也会正常工作 while(true) { Thread.sleep(30 * 1000); } }}
复制代码


(3)守护线程的应用场景


一.垃圾回收器就采用了守护线程


如果一个程序中没有任何用户线程,那么就不会产生垃圾,垃圾回收器也就不需要工作了。

 

二.心跳检测等定时异步执行任务


这些都是在后台不断执行的任务,当进程退出时,这些任务也不需要存在,而守护进程可以自动结束自己的生命周期。

 

8.一般不常用到的 ThreadGroup 是什么


(1)ThreadGroup 线程组


ThreadGroup 是线程组,意思就是可以把多个线程加入一个线程组里,这样的好处就是可以将多个线程作为一个整体进行统一的管理和设置。

 

(2)每个线程都有一个父线程


在 Java 里,每个线程都有一个父线程。在线程 A 里创建一个线程 B,那么线程 B 的父线程就是线程 A。比如,Java 都是通过 main 启动的,那么有一个主要的线程就是 mian 线程。在 main 线程里启动的线程,父线程就是 main 线程。

 

(3)每个线程都属于一个线程组


在 Java 里,每个线程都必然属于一个线程组。默认情况下,如果创建一个线程没指定线程组,就会属于父线程的线程组。main 线程的线程组就是 main ThreadGroup。

 

ServiceAliveMonitor 线程的父线程是 main 线程,ServiceAliveMonitor 线程的线程组也是 main 线程的线程组。

 

(4)每个线程都有一个名字


在 Java 里,线程都是有名字的。默认情况下,main 线程的名字就是叫 main。其他线程的名字,一般是叫做 Thread-0 等。

 

(5)线程组也有父线程组


我们可以手动创建一个线程组,将线程加入这个线程组中。我们创建线程组的时候,如果没有手动指定他的父线程组,那么其实默认的父线程组就是 main 线程的线程组。

 

(6)线程组管理多个线程的方法


如果把多个线程放到线程组里去,可以使用如下方法统一进行如下管理:


一.enumerate():复制线程组里的线程二.activeCount():获取线程组里活跃的线程三.getName()、getParent()、list()四.interrupt():打断所有的线程五.destroy():一次性destroy所有的线程
复制代码


JDK 虽然提供了 ThreadGroup,但是平时开发或者很多开源项目都很少用。

 

9.一般不会手动设置线程的优先级


设置线程的优先级,理论上可以让优先级高的线程尽量多执行,但是一般实践中很少会设置线程优先级。因为虽然设置了优先级,但 CPU 可能还是不会按优先级来执行线程。

 

线程的优先级一般是在 1~10 之间,可以通过 ThreadGroup 指定优先级,线程优先级不能大于 ThreadGroup 的优先级。

 

10.Thread 线程的初始化过程


(1)Thread 线程的初始化源码


public class Thread implements Runnable {    //The group of this thread    private ThreadGroup group;    //Whether or not the thread is a daemon thread.    private boolean daemon = false;    //The priority of this thread.    private int priority;    //The context ClassLoader for this thread    private ClassLoader contextClassLoader;    //What will be run.    private Runnable target;    //The requested stack size for this thread, or 0 if the creator did not specify a stack size.    //It is up to the VM to do whatever it likes with this number; some VMs will ignore it.    private long stackSize;    ...    //线程ID    private long tid;    //For generating thread ID    private static long threadSeqNumber;    ...        //Allocates a new Thread object.    public Thread() {        init(null, null, "Thread-" + nextThreadNum(), 0);    }
//Initializes a Thread with the current AccessControlContext. private void init(ThreadGroup g, Runnable target, String name, long stackSize) { init(g, target, name, stackSize, null, true); }
//Initializes a Thread. //@param g, the Thread group //@param target, the object whose run() method gets called //@param name, the name of the new Thread //@param stackSize, the desired stack size for the new thread, or zero to indicate that this parameter is to be ignored. //@param acc, the AccessControlContext to inherit, or AccessController.getContext() if null //@param inheritThreadLocals, if true, inherit initial values for inheritable thread-locals from the constructing thread private void init(ThreadGroup g, Runnable target, String name, long stackSize, AccessControlContext acc, boolean inheritThreadLocals) { ... Thread parent = currentThread(); ... if (g == null) { ... if (g == null) { g = parent.getThreadGroup(); } } ... this.group = g;//设置线程组 this.daemon = parent.isDaemon();//设置daemon状态 this.priority = parent.getPriority();//设置优先级 //设置线程上下文类加载器contextClassLoader if (security == null || isCCLOverridden(parent.getClass())) { this.contextClassLoader = parent.getContextClassLoader(); } else { this.contextClassLoader = parent.contextClassLoader; } this.inheritedAccessControlContext = acc != null ? acc : AccessController.getContext(); this.target = target; setPriority(priority); if (inheritThreadLocals && parent.inheritableThreadLocals != null) { this.inheritableThreadLocals = ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); } //Stash the specified stack size in case the VM cares this.stackSize = stackSize; //Set thread ID tid = nextThreadID(); }
private static synchronized long nextThreadID() { return ++threadSeqNumber; }
... //Returns a reference to the currently executing thread object. public static native Thread currentThread(); }
复制代码


(2)Thread 线程的初始化说明


通过 currentThread()方法获取到的 Thread,就是当前要创建线程的线程。比如在 main 线程中创建一个继承了 Thread 的 ServiceAliveMonitor 线程,那么在创建该线程的过程中,通过 currentThread()获取到的就是 main 线程。

 

创建一个线程时,如果不指定线程名称,那么会自动生成的线程名称,比如会自动生成类似于 Thread-0 等名称。

 

创建一个线程时,默认它的父线程就是创建它的那个线程。比如 main 线程创建一个继承了 Thread 的 ServiceAliveMonitor 线程,那么此时 ServiceAliveMonitor 线程的父线程就是 main 线程。

 

创建一个线程时,如果不指定线程组,那么线程组就是父线程的线程组。每个线程必须属于一个 ThreadGroup,如果父线程是 main 线程,那么其线程组就是 main 线程的线程组。

 

创建一个线程时,如果不指定为 daemon,那么 daemon 状态由父线程决定。如果父线程是 daemon 线程,那么它也是 daemon 线程。

 

创建一个线程时,如果不指定优先级,那么就和父线程的优先级一致。

 

此外每个线程都有一个线程 ID,存储在 Thread 对象的 tid 属性中。Thread 对象的 tid 属性通过 Thread 类的静态属性 threadSeqNumber 自增生成。第一个生成的线程的 ID 是 1,之后生成的线程 ID 依次是 2, 3, 4, 5...

 

(3)Thread 线程初始化要点总结


一.由父线程来创建子线程

二.线程的 ThreadGroup 默认与父线程的相同

三.线程的 daemon 状态默认与父线程的相同

四.线程的优先级默认是父线程的优先级

五.线程的名称默认是 Thread-0 格式的名称

六.线程的 ID 是全局递增的,从 1 开始

七.线程的上下文类加载器默认与父线程的相同

 

11.Thread 线程的启动过程


(1)Thread 线程的启动源码


下面是一个线程执行 start()方法进行启动时的源码。注意:不能对一个线程多次调用和执行 start()方法。因为一个线程一旦被执行,那么它的 threadStatus 属性就一定会变成非 0 值。如果多次执行一个线程的 start()方法,会抛出 IllegalThreadStateException。


public class Thread implements Runnable {    private ThreadGroup group;    private volatile int threadStatus = 0;    ...        //Causes this thread to begin execution; the Java Virtual Machine calls the run method of this thread.    //The result is that two threads are running concurrently:     //the current thread (which returns from the call to the start method) and the other thread (which executes its run method).    //It is never legal to start a thread more than once.     //In particular, a thread may not be restarted once it has completed execution.    public synchronized void start() {        if (threadStatus != 0) {            throw new IllegalThreadStateException();        }        //group就是初始化线程时分配的        //将当前线程加入了它属于的那个线程组        group.add(this);        boolean started = false;        try {            start0();            started = true;        } finally {            try {                if (!started) {                    group.threadStartFailed(this);                }            } catch (Throwable ignore) {            }        }    }
private native void start0(); ...}
复制代码


(2)Thread 对象的 start0()方法


Thread 对象的 start0()方法会结合底层的代码机制来启动一个线程。底层执行 Thread.start0()方法时,会执行 Thread.run()或 Runnable.run()方法。

 

如果创建的 Thread 对象重写了 run()方法,那么就会执行被重写的 run()方法,否则就会执行 Thread 对象的 run()方法。

 

如果创建 Thread 对象时传入了 Runnable 对象,且没有重写 Thread.run()方法,那么在执行 Thread 的 run()方法时,就执行传入的 Runnable 对象的 run()方法。


public class Thread implements Runnable {    //What will be run.    private Runnable target;    ...        public Thread(Runnable target) {        init(null, target, "Thread-" + nextThreadNum(), 0);    }
@Override public void run() { //如果初始化线程时传入了Runnable对象,就执行 if (target != null) { target.run(); } } ...}
public class Demo { public static void main(String[] args) { //创建Thread对象时传入一个Runnable对象 new Thread(new Runnable() { public void run() { } }).start(); }}
复制代码


(3)Thread 线程的启动总结


一.不能多次调用 start()方法,因为启动后 threadStatus 就会变成非 0 值

二.启动线程后,这个线程才会加入线程初始化时指定的线程组中

三.启动一个线程实际上是通过 start0()这个 native 方法来启动的

四.线程启动后会执行被重写的 run()方法或传入的 Runnable 的 run()方法

 

12.sleep()方法可让线程暂停一段时间


微服务存活状态监控线程,每次检查完都需要停顿几秒钟。此时会使用 Thread.sleep()这个方法,指定要等待多少毫秒。

 

JDK 1.5 之后引入了 TimeUnit 这个类,也能很方便地实现休眠的效果。不过如果需要动态传入一个外部配置的休眠时间,使用 TimeUnit 要额外判断休眠的时间单位,此时使用 Thread.sleep()更合适。


TimeUnit.HOURS.sleep(1);TimeUnit.MINUTES.sleep(5);TimeUnit.SECONDS.sleep(30);TimeUnit.MILLISECONDS.sleep(500);
复制代码


13.yield()方法可以切换当前线程执行其他线程


如果担心某个线程一直长时间霸占 CPU,导致其他线程很少得到机会来执行。此时可以使用 Thread.yield()方法让当前线程先别执行,让其他线程来先执行。在 Disruptor 框架中就使用了 Thread.yield()方法实现切换线程自旋的等待策略。


14.join()方法实现服务注册线程的阻塞式运行


一.没有使用 join()时


在 main 线程里开启了其他线程,那么 main 线程会和其他线程并发运行。也就是一会儿执行 main 线程的代码,一会儿执行其他线程的代码。

 

二.使用 join()时


在 main 线程里开启一个线程 A,main 线程如果执行了线程 A 的 join()方法,那么就会导致 main 线程被阻塞,main 线程会等待线程 A 执行完毕才会继续。

 

比如,在微服务注册中心的 register-client 代码里,可以在主线程里执行注册线程的 join()方法,这样注册成功才会启动心跳线程。


//负责跟register-server进行通信,发起注册请求和心跳请求public class RegisterClient {    public static final String SERVICE_NAME = "inventory-service";    public static final String IP = "192.168.31.207";    public static final String HOSTNAME = "inventory01";    public static final int PORT = 9000;    private static final Long HEARTBEAT_INTERVAL = 30 * 1000L;    //服务实例ID    private String serviceInstanceId;    //HTTP通信组件    private HttpSender httpSender;
public RegisterClient() { this.serviceInstanceId = UUID.randomUUID().toString().replace("-", ""); this.httpSender = new HttpSender(); }
public void start() { try { //启动一个服务注册线程去完成注册 RegisterWorker registerWorker = new RegisterWorker(); registerWorker.start(); //通过join()方法等待服务注册线程的完成 registerWorker.join(); //服务注册线程执行完后才启动心跳线程 HeartbeatWorker heartbeatWorker = new HeartbeatWorker(); heartbeatWorker.start(); } catch (Exception e) { e.printStackTrace(); } }
//服务注册线程 private class RegisterWorker extends Thread { @Override public void run() { RegisterRequest registerRequest = new RegisterRequest(); registerRequest.setServiceName(SERVICE_NAME); registerRequest.setIp(IP); registerRequest.setHostname(HOSTNAME); registerRequest.setPort(PORT); registerRequest.setServiceInstanceId(serviceInstanceId); RegisterResponse registerResponse = httpSender.register(registerRequest); System.out.println("服务注册的结果是:" + registerResponse.getStatus() + "......"); } }
//心跳线程 private class HeartbeatWorker extends Thread { @Override public void run() { //如果服务注册成功,就进入while true死循环 HeartbeatRequest heartbeatRequest = new HeartbeatRequest(); heartbeatRequest.setServiceName(SERVICE_NAME); heartbeatRequest.setServiceInstanceId(serviceInstanceId); HeartbeatResponse heartbeatResponse = null; while(true) { try { heartbeatResponse = httpSender.heartbeat(heartbeatRequest); System.out.println("心跳的结果为:" + heartbeatResponse.getStatus() + "......"); Thread.sleep(HEARTBEAT_INTERVAL); } catch (Exception e) { e.printStackTrace(); } } } }}
复制代码


15.interrupt()方法如何中断线程和打断休眠


(1)Thread.interrupt()方法的源码


Thread.interrupt()方法会修改线程底层的 interrupt 标志位为 true,Thread.isInterrupted()方法可以获取这个 interrupt 标志位的值。

 

所以可以通过 Thread.isInterrupted()方法来判断是否要继续运行线程,而不是执行 Thread.interrupt()方法,直接就会中断线程不让线程运行。具体就是在 while 循环的条件中通过 isInterrupted()方法获取 interrupt 标志位,如果线程没有被中断,那么 interrupt 标志位就为 false,while 循环正常执行。如果线程被中断,那么 isInterrupted()就会返回 true,于是就终止 while 循环。

 

如果线程正在被 wait()方法、join()方法、sleep()方法、以及 IO 操作阻塞着,那么执行该线程的 interrupt()方法会抛出 InterruptedException 等异常。


public class Thread implements Runnable {    ...    //Interrupts this thread.        //Unless the current thread is interrupting itself, which is always permitted,     //the checkAccess() method of this thread is invoked,     //which may cause a SecurityException to be thrown.        //If this thread is blocked in an invocation of the Object#wait(), Object#wait(long),     //or Object#wait(long, int) methods of the Object class,     //or of the #join(), #join(long), #join(long, int), #sleep(long),     //or #sleep(long, int) methods of this class,     //then its interrupt status will be cleared and it will receive an InterruptedException.        //If this thread is blocked in an I/O operation upon an java.nio.channels.InterruptibleChannel,     //then the channel will be closed, the thread's interrupt status will be set,     //and the thread will receive a java.nio.channels.ClosedByInterruptException.        //If this thread is blocked in a java.nio.channels.Selector,    //then the thread's interrupt status will be set and it will return immediately from the selection operation,     //possibly with a non-zero value, just as if the selector's java.nio.channels.Selector#wakeup method were invoked.        //If none of the previous conditions hold then this thread's interrupt status will be set.     //Interrupting a thread that is not alive need not have any effect.    public void interrupt() {        if (this != Thread.currentThread()) {            checkAccess();        }        synchronized (blockerLock) {            Interruptible b = blocker;            if (b != null) {                interrupt0();//Just to set the interrupt flag                b.interrupt(this);                return;            }        }        interrupt0();//Just to set the interrupt flag    }        //Tests whether this thread has been interrupted.    //The interrupted status of the thread is unaffected by this method.    //A thread interruption ignored because a thread was not alive     //at the time of the interrupt will be reflected by this method returning false.    //return true if this thread has been interrupted; return false otherwise.    public boolean isInterrupted() {        return isInterrupted(false);    }        //Tests if some Thread has been interrupted.      //The interrupted state is reset or not based on the value of ClearInterrupted that is passed.    private native boolean isInterrupted(boolean ClearInterrupted);    private native void interrupt0();    ...}
复制代码


(2)Thread.interrupt()方法如何中断线程


使用 Thread.interrupt()方法中断一个运行着的线程的方法如下:


public class Demo {    public static void main(String[] args) throws Exception {        Thread thread = new Thread() {            @Override            public void run() {                while(!isInterrupted()) {                      System.out.println("线程1在执行工作......");                  }            }        };        thread.start();        Thread.sleep(1000);          thread.interrupt();    }}
复制代码


(3)Thread.interrupt()方法打断休眠抛出异常


Thread.interrupt()方法的一个常见用法就是打断一个线程的休眠。


public class Demo {    public static void main(String[] args) throws Exception {        MyThread thread = new MyThread();        thread.start();        //本来要休眠30秒的, 现在休眠了1秒就interrupt打断休眠        Thread.sleep(1000);        thread.setShouldRun(false);          thread.interrupt();    }
private static class MyThread extends Thread { private Boolean shouldRun = true; @Override public void run() { while (shouldRun) { try { System.out.println("线程1在执行工作......"); Thread.sleep(30 * 1000); } catch (Exception e) { e.printStackTrace(); } } } public void setShouldRun(Boolean shouldRun) { this.shouldRun = shouldRun; } }}
复制代码


16.interrupt()方法实现优雅关闭心跳线程


在一个分布式系统里,通常会有核心的工作线程,并会设计一个 shutdown()方法关闭这个系统。

 

在这个 shutdown()方法会设置工作线程是否需要运行的标志位为 false,设置完标志位为 false 之后,会对工作线程都执行其 interrupt()方法。

 

由于工作线程可能会在不断地运行 while 循环,而每循环一次会进入休眠状态。所以如果想尽快停止系统,那么可用 interrupt()方法打断工作线程的休眠,同时通过判断工作线程需要运行的标志位是否为 false 来立即退出线程。只有所有的工作线程都结束了,JVM 进程才会自动退出。

 

比如在微服务注册中心的 register-client 中新增一个 isRunning 标志位,通过 isRunning 标志位来判断服务实例是否还需要运行来终止心跳线程。


public class RegisterClient {    ...    //服务实例是否还需要继续运行    private Boolean isRunning;    public RegisterClient() {        this.serviceInstanceId = UUID.randomUUID().toString().replace("-", "");        this.httpSender = new HttpSender();        this.heartbeatWorker = new HeartbeatWorker();        this.isRunning = true;    }    ...        //停止运行RegisterClient组件    public void shutdown() {        this.isRunning = false;        this.heartbeatWorker.interrupt();     }    ...        //心跳线程    private class HeartbeatWorker extends Thread {        @Override        public void run() {            //如果注册成功,就进入while true死循环            HeartbeatRequest heartbeatRequest = new HeartbeatRequest();            heartbeatRequest.setServiceName(SERVICE_NAME);              heartbeatRequest.setServiceInstanceId(serviceInstanceId);            HeartbeatResponse heartbeatResponse = null;            while(isRunning) {                 try {                    heartbeatResponse = httpSender.heartbeat(heartbeatRequest);                    System.out.println("心跳的结果为:" + heartbeatResponse.getStatus() + "......");                    Thread.sleep(HEARTBEAT_INTERVAL);                   } catch (Exception e) {                      e.printStackTrace();                }            }        }    }}
复制代码


文章转载自:东阳马生架构

原文链接:https://www.cnblogs.com/mjunz/p/18714543

体验地址:http://www.jnpfsoft.com/?from=001YH

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
JUC并发—Thread源码分析及案例应用_Thread_不在线第一只蜗牛_InfoQ写作社区