写点什么

☕️【Java 技术之旅】从底层分析 LockSupport 原理机制

发布于: 2021 年 05 月 27 日
☕️【Java 技术之旅】从底层分析LockSupport原理机制

从底层分析 LockSupport 原理机制

知识点

LockSupport 的介绍

LockSupport 类是 Java6(JSR166-JUC)引入的一个类,提供了基本的线程同步原语。LockSupport 实际上是调用了 Unsafe 类里的函数,归结到 Unsafe 里,只有两个函数,而仅仅两个简单的接口,就为上层提供了强大的同步原语,先来解析下两个函数是做什么的。


public native void unpark(Thread jthread);  public native void park(boolean isAbsolute, long time);  
复制代码


  • park:阻塞当前线程(Block current thread),字面理解 park,就算占住,停车的时候不就把这个车位给占住了么?起这个名字还是很形象的

  • isAbsolute 参数是指明时间是否属于绝对。

  • time 参数是指时间值


线程调用 park 函数则等待"许可"。


  • unpark: 使给定的线程停止阻塞(Unblock the given thread blocked)

  • thread 参数是指对相应的线程进行解除阻塞。


线程调用 unpark 函数为线程提供"许可(permit)"。


  • 这个有点像信号量,但是这个"许可"是不能叠加的,"许可"是一次性的。

  • 比如,线程 B 连续调用了三次 unpark 函数,当线程 A 调用 park 函数就使用掉这个"许可",如果线程 A 再次调用 park,则进入等待状态


注意,unpark 函数可以先于 park 调用。比如线程 B 调用 unpark 函数,给线程 A 发了一个"许可",那么当线程 A 调用 park 时,它发现已经有"许可"了,那么它会马上再继续运行。(此部分比 wait/notify(notifyAll))要好很多。

park 和 unpark 的灵活之处

unpark 函数可以先于 park 调用,这个正是它们的灵活之处


  • 一个线程它有可能在别的线程 unPark 之前,或者之后,或者同时调用了 park,那么因为 park 的特性,它可以不用担心自己的 park 的时序问题,否则,如果 park 必须要在 unpark 之前。


考虑一下,两个线程同步,要如何处理?


  • 在 Java5 里是用 wait/notify/notifyAll 来同步的。wait/notify 机制有个很蛋疼的地方是,比如线程 B 要用 notify 通知线程 A,那么线程 B 要确保线程 A 已经在 wait 调用上等待了,否则线程 A 可能永远都在等待


另外,是调用 notify,还是 notifyAll?


notify 只会唤醒一个线程,如果错误地有两个线程在同一个对象上 wait 等待,那么又悲剧了。为了安全起见,貌似只能调用 notifyAll 了。


park/unpark 模型真正解耦了线程之间的同步,线程之间不再需要一个 Object 或者其它变量来存储状态,不再需要关心对方的状态

拓展延伸

HotSpot 里 park/unpark 的实现,每个 java 线程都有一个 Parker 实例,Parker 类是这样定义的


class Parker : public os::PlatformParker {  private:    volatile int _counter ;    ...  public:    void park(bool isAbsolute, jlong time);    void unpark();    ...  }  class PlatformParker : public CHeapObj<mtInternal> {    protected:      pthread_mutex_t _mutex [1] ;      pthread_cond_t  _cond  [1] ;      ...  }  
复制代码


  • 可以看到 Parker 类实际上用 Posix 的 mutex,condition 来实现的

  • 在 Parker 类里的_counter 字段,就是用来记录所谓的“许可”的

  • 当调用 park 时,先尝试直接能否直接拿到"许可",即_counter>0 时,如果成功,则把_counter 设置为 0,并返回:(和信号量的思路很像!)



void Parker::park(bool isAbsolute, jlong time) { // Ideally we'd do something useful while spinning, such // as calling unpackTime(). // Optional fast-path check: // Return immediately if a permit is available. // We depend on Atomic::xchg() having full barrier semantics // since we are doing a lock-free update to _counter. if (Atomic::xchg(0, &_counter) > 0) return;
复制代码


如果不成功,则构造一个 ThreadBlockInVM,然后检查_counter 是不是>0,如果是,则把_counter 设置为 0,unlock mutex 并返回:



ThreadBlockInVM tbivm(jt); if (_counter > 0) { // no wait needed _counter = 0; status = pthread_mutex_unlock(_mutex);
复制代码


否则,再判断等待的时间,然后再调用 pthread_cond_wait 函数等待,如果等待返回,则把_counter 设置为 0,unlock mutex 并返回:


if (time == 0) {    status = pthread_cond_wait (_cond, _mutex) ;  }  _counter = 0 ;  status = pthread_mutex_unlock(_mutex) ;  assert_status(status == 0, status, "invariant") ;  OrderAccess::fence();  
复制代码


当 unpark 时,则简单多了,直接设置_counter 为 1,再 unlock mutext 返回。如果_counter 之前的值是 0,则还要调用 pthread_cond_signal 唤醒在 park 中等待的线程


void Parker::unpark() {    int s, status ;    status = pthread_mutex_lock(_mutex);    assert (status == 0, "invariant") ;    s = _counter;    _counter = 1;    if (s < 1) {       if (WorkAroundNPTLTimedWaitHang) {          status = pthread_cond_signal (_cond) ;          assert (status == 0, "invariant") ;          status = pthread_mutex_unlock(_mutex);          assert (status == 0, "invariant") ;       } else {          status = pthread_mutex_unlock(_mutex);          assert (status == 0, "invariant") ;          status = pthread_cond_signal (_cond) ;          assert (status == 0, "invariant") ;       }    } else {      pthread_mutex_unlock(_mutex);      assert (status == 0, "invariant") ;    }  }
复制代码


  • 用 mutex 和 condition 保护了一个_counter 的变量,当 park 时,这个变量置为了 0,当 unpark 时,这个变量置为 1

  • 值得注意的是在 park 函数里,调用 pthread_cond_wait 时,并没有用 while 来判断,所以 posix condition 里的"Spurious wakeup"一样会传递到上层 Java 的代码里


if (time == 0) {    status = pthread_cond_wait (_cond, _mutex) ;  }  
复制代码


这也就是为什么 Java dos 里提到,当下面三种情况下 park 函数会返回:


Some other thread invokes unpark with the current thread as the target; orSome other thread interrupts the current thread; orThe call spuriously (that is, for no reason) returns.
复制代码


相关的实现代码在:


http://hg.openjdk.java.net/build-infra/jdk7/hotspot/file/52c4a1ae6adc/src/share/vm/runtime/park.hpphttp://hg.openjdk.java.net/build-infra/jdk7/hotspot/file/52c4a1ae6adc/src/share/vm/runtime/park.cpphttp://hg.openjdk.java.net/build-infra/jdk7/hotspot/file/52c4a1ae6adc/src/os/linux/vm/os_linux.hpphttp://hg.openjdk.java.net/build-infra/jdk7/hotspot/file/52c4a1ae6adc/src/os/linux/vm/os_linux.cpp


其它的一些东东:Parker 类在分配内存时,使用了一个技巧,重载了 new 函数来实现了 cache line 对齐。


// We use placement-new to force ParkEvent instances to be  // aligned on 256-byte address boundaries.  This ensures that the least  // significant byte of a ParkEvent address is always 0.     void * operator new (size_t sz) ;  Parker里使用了一个无锁的队列在分配释放Parker实例:
复制代码


volatile int Parker::ListLock = 0 ;  Parker * volatile Parker::FreeList = NULL ;    Parker * Parker::Allocate (JavaThread * t) {    guarantee (t != NULL, "invariant") ;    Parker * p ;      // Start by trying to recycle an existing but unassociated    // Parker from the global free list.    for (;;) {      p = FreeList ;      if (p  == NULL) break ;      // 1: Detach      // Tantamount to p = Swap (&FreeList, NULL)      if (Atomic::cmpxchg_ptr (NULL, &FreeList, p) != p) {         continue ;      }        // We've detached the list.  The list in-hand is now      // local to this thread.   This thread can operate on the      // list without risk of interference from other threads.      // 2: Extract -- pop the 1st element from the list.      Parker * List = p->FreeNext ;      if (List == NULL) break ;      for (;;) {          // 3: Try to reattach the residual list          guarantee (List != NULL, "invariant") ;          Parker * Arv =  (Parker *) Atomic::cmpxchg_ptr (List, &FreeList, NULL) ;          if (Arv == NULL) break ;            // New nodes arrived.  Try to detach the recent arrivals.          if (Atomic::cmpxchg_ptr (NULL, &FreeList, Arv) != Arv) {              continue ;          }          guarantee (Arv != NULL, "invariant") ;          // 4: Merge Arv into List          Parker * Tail = List ;          while (Tail->FreeNext != NULL) Tail = Tail->FreeNext ;          Tail->FreeNext = Arv ;      }      break ;    }      if (p != NULL) {      guarantee (p->AssociatedWith == NULL, "invariant") ;    } else {      // Do this the hard way -- materialize a new Parker..      // In rare cases an allocating thread might detach      // a long list -- installing null into FreeList --and      // then stall.  Another thread calling Allocate() would see      // FreeList == null and then invoke the ctor.  In this case we      // end up with more Parkers in circulation than we need, but      // the race is rare and the outcome is benign.      // Ideally, the # of extant Parkers is equal to the      // maximum # of threads that existed at any one time.      // Because of the race mentioned above, segments of the      // freelist can be transiently inaccessible.  At worst      // we may end up with the # of Parkers in circulation      // slightly above the ideal.      p = new Parker() ;    }    p->AssociatedWith = t ;          // Associate p with t    p->FreeNext       = NULL ;    return p ;  }      void Parker::Release (Parker * p) {    if (p == NULL) return ;    guarantee (p->AssociatedWith != NULL, "invariant") ;    guarantee (p->FreeNext == NULL      , "invariant") ;    p->AssociatedWith = NULL ;    for (;;) {      // Push p onto FreeList      Parker * List = FreeList ;      p->FreeNext = List ;      if (Atomic::cmpxchg_ptr (p, &FreeList, List) == List) break ;    }  }  
复制代码


发布于: 2021 年 05 月 27 日阅读数: 88
用户头像

我们始于迷惘,终于更高水平的迷惘。 2020.03.25 加入

🏆 【酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“】 🏅 【Java技术领域,MySQL技术领域,APM全链路追踪技术及微服务、分布式方向的技术体系等】 🤝未来我们希望可以共同进步🤝

评论

发布
暂无评论
☕️【Java 技术之旅】从底层分析LockSupport原理机制