写点什么

Java 的 wait 和 notify 学习三部曲之一:JVM 源码分析

用户头像
极客good
关注
发布于: 刚刚

thread-A : start wait


thread-B : get lock


thread-C : c thread is start


thread-B : start notify


thread-B : release lock


thread-A : after wait, acquire lock again


thread-A : release lock


thread-C : get lock


thread-C : release lock


针对以上结果,问题来了:第一个问题:将以上代码反复执行多次,结果都是 B 释放锁之后 A 会先得到锁,这又是为什么呢?C 为何不能先拿到锁呢?


第二个问题:线程 C 自开始就执行了 monitorenter 指令,它能得到锁是容易理解的,但是线程 A 呢?在 wait()之后并没有没有 monitorenter 指令,那么它又是如何取得锁的呢?


wait()、notify()这些方法都是 native 方法,所以只有从 JVM 源码寻找答案了,本次阅读的是 openjdk8 的源码;

带上问题去看 JVM 源码

按照 demo 代码执行顺序,我整理了如下问题,带着这些问题去看 JVM 源码可以聚焦主线,不要被一些支线的次要的代码卡住(例如一些异常处理,监控和上报等):


  1. 线程 A 在 wait()的时候做了什么?

  2. 线程 C 启动后,由于此时线程 B 持有锁,那么线程 C 此时在干啥?

  3. 线程 B 在 notify()的时候做了什么?

  4. 线程 B 释放锁的时候做了什么?

源码中最重要的注释信息

在源码中有段注释堪称是整篇文章最重要的说明,请大家始终记住这段信息,处处都用得上:


ObjectWaiter 对象存在于 WaitSet、EntryList、cxq 等集合中,或者正在这些集合中移动


原文如下:



请务必记住这三个集合:WaitSet、EntryList、cxq


好了,接下来看源码分析问题吧:

线程 A 在 wait()的时候做了什么

打开 hotspot/src/share/vm/runtime/objectMonitor.cpp,看 ObjectMonitor::wait 方法:



如上图所示,有两处代码值得我们注意:


  1. 绿框中将当前线程包装成 ObjectWaiter 对象,并且状态为 TS_WAIT,这里对应的是 jstack 看到的线程状态 WAITING;

  2. 红框中调用了 AddWaiter 方法,跟进去看下:



这个 ObjectWaiter 对象被放入了 WaitSet 中,WaitSet 是个环形双向链表(circular doubly linked list)


回到 ObjectMonitor::wait 方法接着往下看,会发现关键代码如下图,当前线程通过 park()方法开始挂起(suspend):



至此,我们把 wait()方法要做的事情就理清了:


  1. 包装成 ObjectWaiter 对象,状态为 TS_WAIT;

  2. ObjectWaiter 对象被放入_WaitSet 中;

  3. 当前线程挂起;

线程 B 持有锁的时候线程 C 在干啥

此时的线程 C 无法进入 synchronized{}代码块,用 jstack 看应该是 BLOCKED 状态,如下图:



我们看看 monitorenter 指令对应的源码吧,位置:openjdk/hotspot/src/share/vm/interpreter/interpreterRuntime.cpp


IRT_ENTRY_NO_ASYNC(void, InterpreterRuntime::monitorenter(JavaThread* thread, BasicObjectLock* elem))


#ifdef ASSERT


thread->last_frame().interpreter_frame_verify_monitor(elem);


#endif


if (PrintBiasedLockingStatistics) {


Atomic::inc(BiasedLocking::slow_path_entry_count_addr());


}


Handle h_obj(thread, elem->obj());


assert(Universe::heap()->is_in_reserved_or_null(h_obj()),


"must be NULL or an object");


if (UseBiasedLocking) {


// Retry fast entry if bias is revoked to avoid unnecessary inflation


ObjectSynchronizer::fast_enter(h_obj, elem->lock(), true, CHECK);


} else {


ObjectSynchronizer::slow_enter(h_obj, elem->lock(), CHECK);


}


assert(Universe::heap()->is_in_reserved_or_null(elem->obj()),


"must be NULL or an object");


#ifdef ASSERT


thread->last_frame().interpreter_frame_verify_monitor(elem);


#endif


IRT_END


上面的代码有个 if (UseBiasedLocking)判断,是判断是否使用偏向锁的,本例中的锁显然已经不属于当前线程 C 了,所以我们还是直接看 slowenter(hobj, elem->lock(), CHECK)方法吧;


打开 openjdk/hotspot/src/share/vm/runtime/synchronizer.cpp:


void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {


markOop mark = obj->mark();


assert(!mark->has_bias_pattern(), "should not see bias pattern here");


//是否处于无锁状态


if (mark->is_neutral()) {


// Anticipate successful CAS -- the ST of the displaced mark must


// be visible <= the ST performed by the CAS.


lock->set_displaced_header(mark);


//无锁状态就去竞争锁


if (mark == (markOop) Atomic::cmpxchg_ptr(lock, obj()->mark_addr(), mark)) {


TEVENT (slow_enter: release stacklock) ;


return ;


}


// Fall through to inflate() ...


} else


if (mark->has_locker() && THREAD->is_lock_owned((address)mark->locker())) {


//如果处于有锁状态,就检查是不是当前线程持有锁,如果是当前线程持有的,就 return,然后就能执行同步代码块中的代码了


assert(lock != mark->locker(), "must not re-lock the same lock");


assert(lock != (BasicLock*)obj->mark(), "don't relock with same BasicLock");


lock->set_displaced_header(NULL);


return;


}


#if 0


// The following optimization isn't particularly useful.


if (mark->has_monitor() && mark->monitor()->is_entered(THREAD)) {


lock->set_displaced_header (NULL) ;


return ;


}


#endif


// The object header will never be displaced to this lock,


// so it does not matter what the value is, except that it


// must be non-zero to avoid looking like a re-entrant lock,


// and must not look locked either.


lock->set_displaced_header(markOopDesc::unused_mark());


//锁膨胀


ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);


}


线程 C 在上面代码中的执行顺序如下:


  1. 判断是否是无锁状态,如果是就通过 Atomic::cmpxchg_ptr 去竞争锁;

  2. 不是无锁状态,就检查当前锁是否是线程 C 持有;

  3. 不是线程 C 持有,调用 inflate 方法开始锁膨胀;


ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);


来看看锁膨胀的源码:


![640?wx_fmt=png](https://img-blog.csdni


【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


mg.cn/img_convert/4a5bd29bc6ddc405119e23b0fd389f30.png)


如上图,锁膨胀的代码太长,我们这里只看关键代码吧:红框中,如果当前状态已经是重量级锁,就通过 mark->monitor()方法取得 ObjectMonitor 指针再返回;绿框中,如果还不是重量级锁,就检查是否处于膨胀中状态(其他线程正在膨胀中),如果是膨胀中,就调用 ReadStableMark 方法进行等待,ReadStableMark 方法执行完毕后再通过 continue 继续检查,ReadStableMark 方法中还会调用 os::NakedYield()释放 CPU 资源;


如果红框和绿框的条件都没有命中,目前已经是轻量级锁了(不是重量级锁并且不处于锁膨胀状态),可以开始膨胀了,如下图:



简单来说,锁膨胀就是通过 CAS 将监视器对象 OjectMonitor 的状态设置为 INFLATING,如果 CAS 失败,就在此循环,再走前一副图中的的红框和绿框中的判断,如果 CAS 设置成功,会继续设置 ObjectMonitor 中的 header、owner 等字段,然后 inflate 方法返回监视器对象 OjectMonitor;


看看之前 slow_enter 方法中,调用 inflate 方法的代码如下:


ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);


所以 inflate 方法返回监视器对象 OjectMonitor 之后,会立刻执行 OjectMonitor 的 enter 方法,这个方法中开始竞争锁了,方法在 openjdk/hotspot/src/share/vm/runtime/objectMonitor.cpp 文件中:



如上图,红框中表示 OjectMonitor 的 enter 方法一进来就通过 CAS 将 OjectMonitor 的 owner 设置为当前线程,绿框中表示设置成功的逻辑,第一个 if 表示重入锁的逻辑,第二个 if 表示第一次设置 owner 成功,都意味着竞争锁成功,而我们的线程 C 显然是竞争失败的,会进入下图中的无线循环,反复调用 EnterI 方法:



进入 EnterI 方法看看:



如上图,首先构造一个 ObjectWaiter 对象 node,后面的 for(;;)代码块中来是一段非常巧妙的代码,同一时刻可能有多个线程都竞争锁失败走进这个 EnterI 方法,所以在这个 for 循环中,用 CAS 将 cxq 地址放入 node 的 next,也就是把 node 放到 cxq 队列的首位,如果 CAS 失败,就表示其他线程把 node 放入到 cxq 的首位了,所以通过 for 循环再放一次,只要成功,此 node 就一定在最新的_cxq 队列的首位。


接下来的代码又是一个无限循环,如下图:



从上图可以看出,进入循环后先调用 TryLock 方法竞争一次锁,如果成功了就退出循环,否则就调用 Self->_ParkEvent->park 方法使线程挂起,这里有自旋锁的逻辑,也就是 park 方法带了时间参数,就会在挂起一段时间后自动唤醒,如果不是自旋的条件,就一直挂起等待被其他条件唤醒,线程被唤醒后又会执行 TryLock 方法竞争一次锁,竞争不到继续这个 for 循环;


到这里我们已经把线程 C 在 BLOCK 的时候的逻辑理清楚了,小结如下:


  1. 偏向锁逻辑,未命中;

  2. 如果是无锁状态,就通过 CAS 去竞争锁,此处由于锁已经被线程 B 持有,所以不是无锁状态;

  3. 不是无锁状态,而且锁不是线程 C 持有,执行锁膨胀,构造 OjectMonitor 对象;

  4. 竞争锁,竞争失败就将线程加入_cxq 队列的首位;

  5. 开始无限循环,竞争锁成功就退出循环,竞争失败线程挂起,等待被唤醒后继续竞争;

线程 B 在 notify()的时候做了什么

接下来该线程 B 执行 notify 了,代码是 objectMonitor.cpp 的 ObjectMonitor::notify 方法:



如上图所示,首先是 Policy 的赋值,其次是调用 DequeueWaiter()方法将 WaitSet 队列的第一个值取出并返回,还记得 WaitSet 么?所有 wait 的线程都被包装成 ObjectWaiter 对象然后放进来了;接下来对 ObjectWaiter 对象的处理方式,根据 Policy 的不同而不同:Policy == 0:放入 EntryList 队列的排头位置;Policy == 1:放入 EntryList 队列的末尾位置;Policy == 2:EntryList 队列为空就放入 EntryList,否则放入_cxq 队列的排头位置;



如上图所示,请注意把 ObjectWaiter 的地址写到 cxq 变量的时候要用 CAS 操作,因为此时可能有其他线程正在竞争锁,竞争失败的时候会将自己包装成 ObjectWaiter 对象加入到 cxq 中;


这里的代码有一处疑问,期待着读着您的指教:如果_EntryList 为空,就把 ObjectWaiter 放入 ObjectWaiter 中,为什么要这样做呢?


Policy == 3:放入 cxq 队列中,末尾位置;更新 cxq 变量的值的时候,同样要通过 CAS 注意并发问题;

用户头像

极客good

关注

还未添加个人签名 2021.03.18 加入

还未添加个人简介

评论

发布
暂无评论
Java的wait和notify学习三部曲之一:JVM源码分析