写点什么

☕️【Java 技术之旅】【AbstractQueuedSynchronizer】教你自定义实现自己的同步器

发布于: 2021 年 07 月 28 日
☕️【Java技术之旅】【AbstractQueuedSynchronizer】教你自定义实现自己的同步器

前提概要

之前的文章中会涉及到了相关 AQS 的原理和相关源码的分析,所谓实践是检验真理的唯一标准!接下来就让我们活化一下 AQS 技术,主要针对于自己动手实现一个 AQS 同步器。

定义 MyLock 实现 Lock

Doug Lea 大神在 JDK1.5 编写了一个 Lock 接口,里面定义了实现一个锁的基本方法,我们只需编写一个 MyLock 类实现这个接口就好。


class MyLock implements Lock {    /**     * 加锁。如果不成功则进入等待队列     */    @Override    public void lock() {}    /**    * 加锁(可被interrupt)    */    @Override    public void lockInterruptibly() throws InterruptedException {}    /**     * 尝试加锁     */    @Override    public boolean tryLock() {}    /**     * 加锁 带超时的     */    @Override    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {}    /**    * 释放锁    */    @Override    public void unlock() {}    /**    * 返回一个条件变量(不在本案例谈论)    */    @Override    public Condition newCondition() {}}
复制代码


定义好 MyLock 后,接下来就是实现各个方法的逻辑,达到真正的用于线程间 sync 互斥的需求。

自定义一个 MySync 继承自 AQS

接下来我们需要自定义一个继承自 AQS 的 MySync。实现自定义的 MySync 前,先了解 AQS 内部的一些基本概念。在 AQS 中主要的一些成员属性如下:



  • state:用于标记资源状态,如果为 0 表示资源没有被占用,可以加锁成功。如果大于 0 表示资源已经被占用,然后根据自己的定义去实现是否允许对共享资源进行操作

  • 比如:ReentrantLock 的实现方式是当 state 大于 0,那么表示已经有线程获得锁了,我们都知道 ReentrantLock 是可重入的,其原理就是当有线程次进入同一个 lock 标记的临界区时。先判断这个线程是否是获得锁的那个线程,如果是,state 会+1,此时 state 会等于 2。

  • 当 unlock 时,会一层一层的减 1,直到 state 等于 0 则表示完全释放锁成功。

  • head、tail:用于存放获得锁失败的线程。在 AQS 中,每一个线程会被封装成一个 Node 节点,这些节点如果获得锁资源失败会链在 head、tail 中,成为一个双向链表结构。

  • exclusiveOwnerThread用于存放当前获得锁的线程,正如在 state 说明的那样。ReentrantLock 判断可重入的条件就是用这个 exclusiveOwnerThread 线程跟申请获得锁的线程做比较,如果是同一个线程,则 state+1,并重入加锁成功


知道这些概念后我们就可以自定义一个 AQS:


public final class MySync extends AbstractQueuedSynchronizer {    /**    * 尝试加锁    */    @Override    protected boolean tryAcquire(int arg) {        if (compareAndSetState(0, 1)) {            // 修改state状态成功后设置当前线程为占有锁资源线程            setExclusiveOwnerThread(Thread.currentThread());            return true;        }        return false;    }    /**    * 释放锁    */    @Override    protected boolean tryRelease(int arg) {        setExclusiveOwnerThread(null);        // state有volatile修饰,为了保证解锁后其他的一些变量对其他线程可见,把setExclusiveOwnerThread(null)放到上面 happens-before中定义的 volatile规则        setState(0);        return true;    }    /**    * 判断是否是独占锁    */    @Override    protected boolean isHeldExclusively() {        return getState() == 1;    }}
复制代码

将 MySync 组合进 MyLock

最后一步就是将第一步中的所有方法逻辑完成


class MyLock implements Lock {
// 组合自定义sync器 private MySync sync = new MySync();
/** * 加锁。如果不成功则进入等待队列 */ public void lock() { sync.acquire(1); } /** * 加锁(可被interrupt) */ public void lockInterruptibly() throws InterruptedException { sync.acquireInterruptibly(1); } /** * 尝试加锁 */ public boolean tryLock() { return sync.tryAcquire(1); } /** * 加锁 带超时的 */ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1, unit.toMillis(time)); } /** * 释放锁 */ public void unlock() { sync.release(0); } /** * 返回一个条件变量(不在本案例谈论) */ @Override public Condition newCondition() { return null; }}
复制代码


完成整个 MyLock 的逻辑后,发现在 lock()、unlock()中调用的自定义 sync 的方法 tryAcquire()和 tryRelease()方法。我们就以在 lock()方法中调用 acquire()方法说明模板设计模式在 AQS 中的应用。


点进.acquire()方法后,发现改该方法是来自 AbstractQueuedSynchronizer 中:



  • 在这里面可以看到 tryAcquire 方法,继续点进去看看 tryAcquire(),发现该方法是一个必须被重写的方法,否则抛出一个运行时异常。

  • 模板方法设计模式在这里得以体现,再回到我们第二部中自定义的 MySync 中,就是重写了 AQS 中的 tryAcquire()方法。



因此整个自定义加锁的流程如下:


  • 调用 MyLock 的 lock(),lock()方法调用 AQS 的 acquire()方法

  • 在 acquire()方法中调用了 tryAcquire()方法进行加锁

  • 而 tryAcquire()方法在 AQS 中是一个必须让子类自定义重写的方法,否则会抛出一个异常

  • 因此调用 tryAcquire()时实际上是调用了我们自定义的 MySync 类中 tryAcquire()方法

总结

AQS 作为 Java 并发体系下的关键类,在各种并发工具中都有它的身影,如 ReentrantLock、Semaphore 等。这些并发工具用于控制 sync 互斥的手段都是采用 AQS,外加 Cas 机制。AQS 采用了模板方法设计模式让子类们自定义 sync 互斥的条件,比如本案例中 MySync 类重写了 tryAcquire 方法。


下面实现一个自定义的 sync:


public class SelfSynchronizer {
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public boolean unLock() { return sync.release(1); }
static class Sync extends AbstractQueuedSynchronizer { //是否处于占用状态 @Override protected boolean isHeldExclusively() { return getState() == 1; }
/** * 获取sync资源 * @param acquires * @return */ @Override public boolean tryAcquire(int acquires) { if(compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } //这里没有考虑可重入锁 /*else if (Thread.currentThread() == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; }*/ return false; }
/** * 释放sync资源 * @param releases * @return */ @Override protected boolean tryRelease(int releases) { int c = getState() - releases; boolean free = false; if (c == 0) { free = true; } setState(c); return free; } }}
复制代码


ReentrantLock 源码和上面自定义的 sync 很相似,测试下该 sync,i++在多线程下执行情况:


public class TestSelfSynchronizer {    private static int a = 0;    private static int b = 0;    private static SelfSynchronizer selfSynchronizer = new SelfSynchronizer();    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 50, 1, TimeUnit.SECONDS,            new LinkedBlockingQueue<Runnable>());    private static ExecutorService ec = Executors.newFixedThreadPool(20);    public static void main(String[] args) throws InterruptedException {        for (int i = 0; i < 20 ; i++) {            executor.submit(new Task());        }        for (int j = 0; j < 20 ; j++) {            ec.submit(new TaskSync());        }        Thread.sleep(10000);        System.out.println("a的值:"+ a);        System.out.println("b的值" + b);        executor.shutdown();        ec.shutdown();    }    static class Task implements Runnable {        @Override        public void run() {            for(int i=0;i<10000;i++) {                a++;            }        }    }    static class TaskSync implements Runnable {        @Override        public void run() {            for (int i = 0; i < 10000; i++) {              //使用sync器加锁                selfSynchronizer.lock();                b++;                selfSynchronizer.unLock();            }        }    }}
复制代码


开启两个线程池,对 int 型变量自增 10000 次,如果不加 sync 器,最后值小于 200000,使用了自定义 sync 器则最后值正常等于 200000,这是因为每次自增操作加锁

发布于: 2021 年 07 月 28 日阅读数: 1144
用户头像

🏆2021年InfoQ写作平台-签约作者 🏆 2020.03.25 加入

👑【酷爱计算机技术、醉心开发编程、喜爱健身运动、热衷悬疑推理的”极客狂人“】 🏅 【Java技术领域,MySQL技术领域,APM全链路追踪技术及微服务、分布式方向的技术体系等】 我们始于迷惘,终于更高水平的迷惘

评论

发布
暂无评论
☕️【Java技术之旅】【AbstractQueuedSynchronizer】教你自定义实现自己的同步器