写点什么

Java 的 wait()、notify() 学习三部曲之一:JVM 源码分析

作者:程序员欣宸
  • 2022 年 4 月 22 日
  • 本文字数:6675 字

    阅读完需:约 22 分钟

Java的wait()、notify()学习三部曲之一:JVM源码分析

欢迎访问我的 GitHub

这里分类和汇总了欣宸的全部原创(含配套源码):https://github.com/zq2599/blog_demos


综述

  • Java 的 wait()、notify()学习三部曲由三篇文章组成,内容分别是:

一、通过阅读 openjdk8 的源码,分析和理解 wait,notify 在 JVM 中的具体执行过程;

二、修改 JVM 源码,编译构建成新的 JVM,把我们感兴趣的参数打印出来,结合具体代码检查和我们的理解是否一致;

三、修改 JVM 源码,编译构建成新的 JVM,按照我们的理解去修改关键参数,看能否达到预期效果;

  • 现在,咱们一起开始既漫长又深入的 wait、notify 学习之旅吧!

wait()和 notify()的通常用法

  • Java 多线程开发中,我们常用到 wait()和 notify()方法来实现线程间的协作,简单的说步骤如下:


  1. A 线程取得锁,执行 wait(),释放锁;

  2. B 线程取得锁,完成业务后执行 notify(),再释放锁;

  3. B 线程释放锁之后,A 线程取得锁,继续执行 wait()之后的代码;

关于 synchronize 修饰的代码块

  • 通常,对于 synchronize(lock){...}这样的代码块,编译后会生成 monitorenter 和 monitorexit 指令,线程执行到 monitorenter 指令时会尝试取得 lock 对应的 monitor 的所有权(CAS 设置对象头),取得后即获取到锁,执行 monitorexit 指令时会释放 monitor 的所有权即释放锁;

一个完整的 demo

  • 为了深入学习 wait()和 notify(),先用完整的 demo 程序来模拟场景吧,以下是源码:


public class NotifyDemo {
private static void sleep(long sleepVal){ try{ Thread.sleep(sleepVal); }catch(Exception e){ e.printStackTrace(); } }
private static void log(String desc){ System.out.println(Thread.currentThread().getName() + " : " + desc); }
Object lock = new Object();
public void startThreadA(){ new Thread(() -> { synchronized (lock){ log("get lock"); startThreadB(); log("start wait"); try { lock.wait(); }catch(InterruptedException e){ e.printStackTrace(); }
log("get lock after wait"); log("release lock"); } }, "thread-A").start(); }
public void startThreadB(){ new Thread(()->{ synchronized (lock){ log("get lock"); startThreadC(); sleep(100); log("start notify"); lock.notify(); log("release lock");
} },"thread-B").start(); }
public void startThreadC(){ new Thread(() -> { synchronized (lock){ log("get lock"); log("release lock"); } }, "thread-C").start(); }
public static void main(String[] args){ new NotifyDemo().startThreadA(); }}
复制代码


  • 以上就是本次实战用到的 demo,代码功能简述如下:

  1. 启动线程 A,取得锁之后先启动线程 B 再执行 wait()方法,释放锁并等待;

  2. 线程 B 启动之后会等待锁,A 线程执行 wait()之后,线程 B 取得锁,然后启动线程 C,再执行 notify 唤醒线程 A,最后退出 synchronize 代码块,释放锁;

  3. 线程 C 启动之后就一直在等待锁,这时候线程 B 还没有退出 synchronize 代码块,锁还在线程 B 手里

  4. 线程 A 在线程 B 执行 notify()之后就一直在等待锁,这时候线程 B 还没有退出 synchronize 代码块,锁还在线程 B 手里

  5. 线程 B 退出 synchronize 代码块,释放锁之后,线程 A 和线程 C 竞争锁;

  • 把上面的代码在 Openjdk8 下面执行,反复执行多次,都得到以下结果:

thread-A : get lockthread-A : start waitthread-B : get lockthread-C : c thread is startthread-B : start notifythread-B : release lockthread-A : after wait, acquire lock againthread-A : release lockthread-C : get lockthread-C : release lock
复制代码


  • 针对以上结果,问题来了:

  • 第一个问题:

  • 将以上代码反复执行多次,结果都是 B 释放锁之后 A 会先得到锁,这又是为什么呢?C 为何不能先拿到锁呢?

  • 第二个问题:

  • 线程 C 自开始就执行了 monitorenter 指令,它能得到锁是容易理解的,但是线程 A 呢?在 wait()之后并没有没有 monitorenter 指令,那么它又是如何取得锁的呢?

  • wait()、notify()这些方法都是 native 方法,所以只有从 JVM 源码寻找答案了,本次阅读的是 openjdk8 的源码;

带上问题去看 JVM 源码

  • 按照 demo 代码执行顺序,我整理了如下问题,带着这些问题去看 JVM 源码可以聚焦主线,不要被一些支线的次要的代码卡住(例如一些异常处理,监控和上报等):


  1. 线程 A 在 wait()的时候做了什么?

  2. 线程 C 启动后,由于此时线程 B 持有锁,那么线程 C 此时在干啥?

  3. 线程 B 在 notify()的时候做了什么?

  4. 线程 B 释放锁的时候做了什么?


源码中最重要的注释信息


  • 在源码中有段注释堪称是整篇文章最重要的说明,请大家始终记住这段信息,处处都用得上:


ObjectWaiter 对象存在于 WaitSet、EntryList、cxq 等集合中,或者正在这些集合中移动

  • 原文如下:

请务必记住这三个集合:WaitSet、EntryList、cxq


  • 好了,接下来看源码分析问题吧:

线程 A 在 wait()的时候做了什么

  • 打开 hotspot/src/share/vm/runtime/objectMonitor.cpp,看 ObjectMonitor::wait 方法:

  • 如上图所示,有两处代码值得我们注意:


  1. 绿框中将当前线程包装成 ObjectWaiter 对象,并且状态为 TS_WAIT,这里对应的是 jstack 看到的线程状态 WAITING;

  2. 红框中调用了 AddWaiter 方法,跟进去看下:


  • 这个 ObjectWaiter 对象被放入了_WaitSet 中,_WaitSet 是个环形双向链表(circular doubly linked list)

  • 回到 ObjectMonitor::wait 方法接着往下看,会发现关键代码如下图,当前线程通过 park()方法开始挂起(suspend):

  • 至此,我们把 wait()方法要做的事情就理清了:


  1. 包装成 ObjectWaiter 对象,状态为 TS_WAIT;

  2. ObjectWaiter 对象被放入_WaitSet 中;

  3. 当前线程挂起;

线程 B 持有锁的时候线程 C 在干啥

  • 此时的线程 C 无法进入 synchronized{}代码块,用 jstack 看应该是 BLOCKED 状态,如下图:

  • 我们看看 monitorenter 指令对应的源码吧,位置:openjdk/hotspot/src/share/vm/interpreter/interpreterRuntime.cpp

IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))#ifdef ASSERT  thread->last_frame().interpreter_frame_verify_monitor(elem);#endif  if (PrintBiasedLockingStatistics) {    Atomic::inc(BiasedLocking::slow_path_entry_count_addr());  }  Handle h_obj(thread, elem->obj());  assert(Universe::heap()->is_in_reserved_or_null(h_obj()),         "must be NULL or an object");  if (UseBiasedLocking) {    // Retry fast entry if bias is revoked to avoid unnecessary inflation    ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);  } else {    ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);  }  assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),         "must be NULL or an object");#ifdef ASSERT  thread->last_frame().interpreter_frame_verify_monitor(elem);#endifIRT_END
复制代码


  • 上面的代码有个 if (UseBiasedLocking)判断,是判断是否使用偏向锁的,本例中的锁显然已经不属于当前线程 C 了,所以我们还是直接看 slow_enter(h_obj, elem->lock(), CHECK)方法吧;

  • 打开 openjdk/hotspot/src/share/vm/runtime/synchronizer.cpp:

void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {  markOop mark = obj->mark();  assert(!mark->has_bias_pattern(), "should not see bias pattern here");
//是否处于无锁状态 if (mark->is_neutral()) { // Anticipate successful CAS -- the ST of the displaced mark must // be visible <= the ST performed by the CAS. lock->set_displaced_header(mark); //无锁状态就去竞争锁 if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) { TEVENT (slow_enter: release stacklock) ; return ; } // Fall through to inflate() ... } else if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) { //如果处于有锁状态,就检查是不是当前线程持有锁,如果是当前线程持有的,就return,然后就能执行同步代码块中的代码了 assert(lock != mark->locker(), "must not re-lock the same lock"); assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock"); lock->set_displaced_header(NULL); return; }
#if 0 // The following optimization isn't particularly useful. if (mark->has_monitor() && mark->monitor()->is_entered(THREAD)) { lock->set_displaced_header (NULL) ; return ; }#endif
// The object header will never be displaced to this lock, // so it does not matter what the value is, except that it // must be non-zero to avoid looking like a re-entrant lock, // and must not look locked either. lock->set_displaced_header(markOopDesc::unused_mark()); //锁膨胀 ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);}
复制代码


  • 线程 C 在上面代码中的执行顺序如下:

  1. 判断是否是无锁状态,如果是就通过 Atomic::cmpxchg_ptr 去竞争锁;

  2. 不是无锁状态,就检查当前锁是否是线程 C 持有;

  3. 不是线程 C 持有,调用 inflate 方法开始锁膨胀;

ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);

  • 来看看锁膨胀的源码:

  • 如上图,锁膨胀的代码太长,我们这里只看关键代码吧:

  • 红框中,如果当前状态已经是重量级锁,就通过 mark->monitor()方法取得 ObjectMonitor 指针再返回;

  • 绿框中,如果还不是重量级锁,就检查是否处于膨胀中状态(其他线程正在膨胀中),如果是膨胀中,就调用 ReadStableMark 方法进行等待,ReadStableMark 方法执行完毕后再通过 continue 继续检查,ReadStableMark 方法中还会调用 os::NakedYield()释放 CPU 资源;

  • 如果红框和绿框的条件都没有命中,目前已经是轻量级锁了(不是重量级锁并且不处于锁膨胀状态),可以开始膨胀了,如下图:


  • 简单来说,锁膨胀就是通过 CAS 将监视器对象 OjectMonitor 的状态设置为 INFLATING,如果 CAS 失败,就在此循环,再走前一副图中的的红框和绿框中的判断,如果 CAS 设置成功,会继续设置 ObjectMonitor 中的 header、owner 等字段,然后 inflate 方法返回监视器对象 OjectMonitor;

  • 看看之前 slow_enter 方法中,调用 inflate 方法的代码如下:

ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
复制代码
  • 进入 EnterI 方法看看:


  • 如上图,首先构造一个 ObjectWaiter 对象 node,后面的 for(;;)代码块中来是一段非常巧妙的代码,同一时刻可能有多个线程都竞争锁失败走进这个 EnterI 方法,所以在这个 for 循环中,用 CAS 将_cxq 地址放入 node 的_next,也就是把 node 放到_cxq 队列的首位,如果 CAS 失败,就表示其他线程把 node 放入到_cxq 的首位了,所以通过 for 循环再放一次,只要成功,此 node 就一定在最新的_cxq 队列的首位。

  • 接下来的代码又是一个无限循环,如下图:


  • 从上图可以看出,进入循环后先调用 TryLock 方法竞争一次锁,如果成功了就退出循环,否则就调用 Self->_ParkEvent->park 方法使线程挂起,这里有自旋锁的逻辑,也就是 park 方法带了时间参数,就会在挂起一段时间后自动唤醒,如果不是自旋的条件,就一直挂起等待被其他条件唤醒,线程被唤醒后又会执行 TryLock 方法竞争一次锁,竞争不到继续这个 for 循环;

  • 到这里我们已经把线程 C 在 BLOCK 的时候的逻辑理清楚了,小结如下:


  1. 偏向锁逻辑,未命中;

  2. 如果是无锁状态,就通过 CAS 去竞争锁,此处由于锁已经被线程 B 持有,所以不是无锁状态;

  3. 不是无锁状态,而且锁不是线程 C 持有,执行锁膨胀,构造 OjectMonitor 对象;

  4. 竞争锁,竞争失败就将线程加入_cxq 队列的首位;

  5. 开始无限循环,竞争锁成功就退出循环,竞争失败线程挂起,等待被唤醒后继续竞争;

线程 B 在 notify()的时候做了什么

  • 接下来该线程 B 执行 notify 了,代码是 objectMonitor.cpp 的 ObjectMonitor::notify 方法:

  • 如上图所示,首先是 Policy 的赋值,其次是调用 DequeueWaiter()方法将_WaitSet 队列的第一个值取出并返回,还记得_WaitSet 么?所有 wait 的线程都被包装成 ObjectWaiter 对象然后放进来了;

  • 接下来对 ObjectWaiter 对象的处理方式,根据 Policy 的不同而不同:

  • Policy == 0:放入_EntryList 队列的排头位置;

  • Policy == 1:放入_EntryList 队列的末尾位置;

  • Policy == 2:_EntryList 队列为空就放入_EntryList,否则放入_cxq 队列的排头位置;



  • 如上图所示,请注意把 ObjectWaiter 的地址写到_cxq 变量的时候要用 CAS 操作,因为此时可能有其他线程正在竞争锁,竞争失败的时候会将自己包装成 ObjectWaiter 对象加入到_cxq 中;

  • 这里的代码有一处疑问,期待着读着您的指教:如果_EntryList 为空,就把 ObjectWaiter 放入 ObjectWaiter 中,为什么要这样做呢?

  • Policy == 3:放入_cxq 队列中,末尾位置;更新_cxq 变量的值的时候,同样要通过 CAS 注意并发问题;

  • 这里有一段很巧妙的代码,现将_cxq 保存在 Tail 中,正常情况下将 ObjectWaiter 赋值给 Tail->_next 就可以了,但是此时有可能其他线程正在_cxq 的尾部追加数据了,所以此时 Tail 对象对应的记录就不是最后一条了,那么它的_next 就非空了,一旦发生这种情况,就执行 Tail = Tail->_next,这样就获得了最新的_cxq 的尾部数据,如下图所示:


  • Policy 等于其他值,立即唤醒 ObjectWaiter 对应的线程;

  • 小结一下,线程 B 执行 notify 时候做的事情:


  1. 执行过 wait 的线程都在队列_WaitSet 中,此处从_WaitSet 中取出第一个;

  2. 根据 Policy 的不同,将这个线程放入_EntryList 或者_cxq 队列中的起始或末尾位置;

线程 B 释放锁的时候做了什么

  • 接下来到了揭开问题的关键了,我们来看 objectMonitor.cpp 的 ObjectMonitor::exit 方法;

    如上图,方法一进来先做一些合法性判断,接下来如红框所示,是偏向锁逻辑,偏向次数减一后直接返回,显然线程 B 在此处不会返回,而是继续往下执行;

  • 根据 QMode 的不同,有不同的处理方式:


  1. QMode = 2,并且_cxq 非空:取_cxq 队列排头位置的 ObjectWaiter 对象,调用 ExitEpilog 方法,该方法会唤醒 ObjectWaiter 对象的线程,此处会立即返回,后面的代码不会执行了;

  2. QMode = 3,并且_cxq 非空:把_cxq 队列首元素放入_EntryList 的尾部;

  3. QMode = 4,并且_cxq 非空:把_cxq 队列首元素放入_EntryList 的头部;

  4. QMode = 0,不做什么,继续往下看;


  • 只有 QMode=2 的时候会提前返回,等于 0、3、4 的时候都会继续往下执行:

  • 如果_EntryList 的首元素非空,就取出来调用 ExitEpilog 方法,该方法会唤醒 ObjectWaiter 对象的线程,然后立即返回;

  • 如果_EntryList 的首元素为空,就取_cxq 的首元素,放入_EntryList,然后再从_EntryList 中取出来执行 ExitEpilog 方法,然后立即返回;

  • 以上操作,均是执行过 ExitEpilog 方法然后立即返回,如果取出的元素为空,就执行循环继续取;

  • 小结一下,线程 B 释放了锁之后,执行的操作如下:


  1. 偏向锁逻辑,此处未命中;

  2. 根据 QMode 的不同,将 ObjectWaiter 从_cxq 或者_EntryList 中取出后唤醒;

  3. 唤醒的元素会继续执行挂起前的代码,按照我们之前的分析,线程唤醒后,就会通过 CAS 去竞争锁,此时由于线程 B 已经释放了锁,那么此时应该能竞争成功;


  • 到了现在已经将之前的几个问题搞清了,汇总起来看看:


  1. 线程 A 在 wait() 后被加入了_WaitSet 队列中;

  2. 线程 C 被线程 B 启动后竞争锁失败,被加入到_cxq 队列的首位;

  3. 线程 B 在 notify()时,从_WaitSet 中取出第一个,根据 Policy 的不同,将这个线程放入_EntryList 或者_cxq 队列中的起始或末尾位置;

  4. 根据 QMode 的不同,将 ObjectWaiter 从_cxq 或者_EntryList 中取出后唤醒;;


  • 所以,最初的问题已经清楚了,wait()的线程被唤醒后,会进入一个队列,然后 JVM 会根据 Policy 和 QMode 的不同对队列中的 ObjectWaiter 做不同的处理,被选中的 ObjectWaiter 会被唤醒,去竞争锁;

  • 至此,源码分析已结束,但是因为我们不知道 Policy 和 QMode 参数到底是多少,所以还不能对之前的问题有个明确的结果,这些还是留在下一章来解答吧,下一章里我们去修改 JVM 源码,把参数都打印出来;

欢迎关注 InfoQ:程序员欣宸

学习路上,你不孤单,欣宸原创一路相伴...

发布于: 2022 年 04 月 22 日阅读数: 2
用户头像

搜索"程序员欣宸",一起畅游Java宇宙 2018.04.19 加入

前腾讯、前阿里员工,从事Java后台工作,对Docker和Kubernetes充满热爱,所有文章均为作者原创,个人Github:https://github.com/zq2599/blog_demos

评论

发布
暂无评论
Java的wait()、notify()学习三部曲之一:JVM源码分析_Java_程序员欣宸_InfoQ写作社区