写点什么

Java 多线程:锁

作者:Java-fenn
  • 2022 年 9 月 12 日
    湖南
  • 本文字数:10426 字

    阅读完需:约 34 分钟

Java 多线程:锁


StampedLockStampedLock 其实是对读写锁的一种改进,它支持在读同时进行一个写操作,也就是说,它的性能将会比读写锁更快。


更通俗的讲就是在读锁没有释放的时候是可以获取到一个写锁,获取到写锁之后,读锁阻塞,这一点和读写锁一致,唯一的区别在于 读写锁不支持在没有释放读锁的时候获取写锁 。


StampedLock 有三种模式:


悲观读:允许多个线程获取悲观读锁。


写锁:写锁和悲观读是互斥的。


乐观读:无锁机制,类似于数据库中的乐观锁,它支持在不释放乐观读的时候是可以获取到一个写锁。


参考: 有没有比读写锁更快的锁?


示例代码:


悲观读 + 写锁:


package git.snippets.juc;


import java.util.HashMap;import java.util.Map;import java.util.concurrent.locks.StampedLock;import java.util.logging.Logger;


// 悲观读 + 写锁 public class StampedLockPessimistic {private static final Logger log = Logger.getLogger(StampedLockPessimistic.class.getName());private static final StampedLock lock = new StampedLock();//缓存中存储的数据 private static final Map<String, String> mapCache = new HashMap<>();//模拟数据库存储的数据 private static final Map<String, String> mapDb = new HashMap<>();


static {    mapDb.put("zhangsan", "你好,我是张三");    mapDb.put("sili", "你好,我是李四");}
private static void getInfo(String name) { //获取悲观读 long stamp = lock.readLock(); log.info("线程名:" + Thread.currentThread().getName() + " 获取了悲观读锁" + " 用户名:" + name); try { if ("zhangsan".equals(name)) { log.info("线程名:" + Thread.currentThread().getName() + " 休眠中" + " 用户名:" + name); Thread.sleep(3000); log.info("线程名:" + Thread.currentThread().getName() + " 休眠结束" + " 用户名:" + name); } String info = mapCache.get(name); if (null != info) { log.info("在缓存中获取到了数据"); return; } } catch (InterruptedException e) { log.info("线程名:" + Thread.currentThread().getName() + " 释放了悲观读锁"); e.printStackTrace(); } finally { //释放悲观读 lock.unlock(stamp); }
//获取写锁 stamp = lock.writeLock(); log.info("线程名:" + Thread.currentThread().getName() + " 获取了写锁" + " 用户名:" + name); try { //判断一下缓存中是否被插入了数据 String info = mapCache.get(name); if (null != info) { log.info("获取到了写锁,再次确认在缓存中获取到了数据"); return; } //这里是往数据库获取数据 String infoByDb = mapDb.get(name); //将数据插入缓存 mapCache.put(name, infoByDb); log.info("缓存中没有数据,在数据库获取到了数据"); } finally { //释放写锁 log.info("线程名:" + Thread.currentThread().getName() + " 释放了写锁" + " 用户名:" + name); lock.unlock(stamp); }}
public static void main(String[] args) {
复制代码


//线程 1Thread t1 = new Thread(() -> {getInfo("zhangsan");});


    //线程2    Thread t2 = new Thread(() -> {        getInfo("lisi");    });
//线程启动 t1.start(); t2.start();
//线程同步 try { t1.join(); t2.join(); } catch (InterruptedException e) { e.printStackTrace(); }}
复制代码


}乐观读:


package git.snippets.juc;


import java.util.concurrent.locks.StampedLock;import java.util.logging.Logger;


// 乐观写 public class StampedLockOptimistic {private static final Logger log = Logger.getLogger(StampedLockOptimistic.class.getName());private static final StampedLock lock = new StampedLock();private static int num1 = 1;private static int num2 = 1;


/** * 修改成员变量的值,+1 * * @return */private static int sum() {    log.info("求和方法被执行了");    //获取乐观读    long stamp = lock.tryOptimisticRead();    int cnum1 = num1;    int cnum2 = num2;    log.info("获取到的成员变量值,cnum1:" + cnum1 + "   cnum2:" + cnum2);    try {        //休眠3秒,目的是为了让其他线程修改掉成员变量的值。        Thread.sleep(3000);    } catch (InterruptedException e) {        e.printStackTrace();    }    //判断在运行期间是否存在写操作   true:不存在   false:存在    if (!lock.validate(stamp)) {        log.info("存在写操作!");        //存在写锁        //升级悲观读锁        stamp = lock.readLock();        try {            log.info("升级悲观读锁");            cnum1 = num1;            cnum2 = num2;            log.info("重新获取了成员变量的值=========== cnum1=" + cnum1 + "    cnum2=" + cnum2);        } finally {            //释放悲观读锁            lock.unlock(stamp);        }    }    return cnum1 + cnum2;}
//使用写锁修改成员变量的值private static void updateNum() { long stamp = lock.writeLock(); try { num1 = 2; num2 = 2; } finally { lock.unlock(stamp); }}

public static void main(String[] args) throws InterruptedException { Thread t1 = new Thread(() -> { int sum = sum(); log.info("求和结果:" + sum); }); t1.start(); //休眠1秒,目的为了让线程t1能执行到获取成员变量之后 Thread.sleep(1000); updateNum(); t1.join(); log.info("执行完毕");
}
复制代码


}使用 StampedLock 的注意事项看名字就能看出来 StampedLock 不支持重入锁。


它适用于读多写少的情况,如果不是这种情况,请慎用,性能可能还不如 synchronized 。


StampedLock 的悲观读锁、写锁不支持条件变量。


千万不能中断阻塞的悲观读锁或写锁,如果调用阻塞线程的 interrupt() ,会导致 cpu 飙升,如果希望 StampedLock 支持中断操作,请使用 readLockInterruptibly (悲观读锁)与 writeLockInterruptibly (写锁)。


CountDownLatch 类似门闩的概念,可以替代 join ,但是比 join 灵活,因为一个线程里面可以多次 countDown ,但是 join 一定要等线程完成才能执行。


其底层原理是:调用 await() 方法的线程会利用 AQS 排队,一旦数字减为 0,则会将 AQS 中排队的线程依次唤醒。


代码如下:


package git.snippets.juc;


import java.util.concurrent.CountDownLatch;


/**


  • CountDownLatch 可以用 Join 替代*/public class CountDownLatchAndJoin {public static void main(String[] args) {useCountDownLatch();useJoin();}

  • public static void useCountDownLatch() {// use countdownlatchlong start = System.currentTimeMillis();Thread[] threads = new Thread[100000];CountDownLatch latch = new CountDownLatch(threads.length);

  • }

  • public static void useJoin() {long start = System.currentTimeMillis();

  • }}CyclicBarrier 类似栅栏,类比:满了 20 个乘客就发车 这样的场景。


比如:一个程序可能收集如下来源的数据:


数据库


网络


文件


程序可以并发执行,用线程操作 1,2,3,然后操作完毕后再合并, 然后执行后续的逻辑操作,就可以使用 CyclicBarrier


代码如下:


package git.snippets.juc;


import java.util.concurrent.BrokenBarrierException;import java.util.concurrent.CyclicBarrier;


/**


  • CyclicBarrier 示例:满员发车

  • @author <a href="mailto:410486047@qq.com">Grey</a>

  • @since 1.8*/public class CyclicBarrierTest {public static void main(String[] args) {CyclicBarrier barrier = new CyclicBarrier(20, () -> {System.out.println("满了 20,发车");});for (int i = 0; i < 100; i++) {new Thread(() -> {try {barrier.await();} catch (InterruptedException | BrokenBarrierException e) {e.printStackTrace();}}).start();}}}Semaphore 表示信号量,有如下两个操作:


s.acquire() 信号量减 1


s.release() 信号量加 1


到 0 以后,就不能执行了,这个可以用于 限流 。


底层原理是:如果没有线程许可可用,则线程阻塞,并通过 AQS 来排队,可以通过 release() 方法来释放许可,当某个线程释放了某个许可后,会从 AQS 中正在排队的第一个线程依次开始唤醒,直到没有空闲许可。


Semaphore 使用示例:有 N 个线程来访问,我需要限制同时运行的只有信号量大小的线程数。


代码如下:


package git.snippets.juc;


import java.util.concurrent.Semaphore;import java.util.concurrent.TimeUnit;


/**


  • Semaphore 用于限流*/public class SemaphoreUsage {public static void main(String[] args) {Semaphore semaphore = new Semaphore(1);new Thread(() -> {try {semaphore.acquire();TimeUnit.SECONDS.sleep(2);System.out.println("Thread 1 executed");} catch (Exception e) {e.printStackTrace();} finally {semaphore.release();}}).start();

  • }}Semaphore 可以有 公平 和 非公平 的方式进行配置。


Semaphore 和 CountDownLatch 的区别?


Semaphore 是信号量,可以做限流,限制 n 个线程并发,释放一个线程后就又能进来一个新的线程。


CountDownLatch 是闭锁,带有阻塞的功能,必须等到 n 个线程都执行完后,被阻塞的线程才能继续往下执行。


Guava RateLimiter 采用令牌桶算法,用于限流


示例代码如下


package git.snippets.juc;


import com.google.common.util.concurrent.RateLimiter;import java.util.List;import java.util.concurrent.Executor;


/**


  • @author <a href="mailto:410486047@qq.com">Grey</a>

  • @date 2021/4/21

  • @since*/public class RateLimiterUsage {//每秒只发出 2 个令牌 static final RateLimiter rateLimiter = RateLimiter.create(2.0);static void submitTasks(List<Runnable> tasks, Executor executor) {for (Runnable task : tasks) {rateLimiter.acquire(); // 也许需要等待 executor.execute(task);}}}注:上述代码需要引入 Guava 包 。


Phaser(Since jdk1.7)遗传算法,可以用这个结婚的场景模拟: 假设婚礼的宾客有 5 个人,加上新郎和新娘,一共 7 个人。 我们可以把这 7 个人看成 7 个线程,有如下步骤要执行。


到达婚礼现场


吃饭


离开


拥抱(只有新郎和新娘线程可以执行)


每个阶段执行完毕后才能执行下一个阶段,其中拥抱阶段只有新郎新娘这两个线程才能执行。


以上需求,我们可以通过 Phaser 来实现,具体代码和注释如下:


package git.snippets.juc;


import java.util.Random;import java.util.concurrent.Phaser;import java.util.concurrent.TimeUnit;


public class PhaserUsage {static final Random R = new Random();static WeddingPhaser phaser = new WeddingPhaser();


static void millSleep() {    try {        TimeUnit.MILLISECONDS.sleep(R.nextInt(1000));    } catch (InterruptedException e) {        e.printStackTrace();    }}
public static void main(String[] args) { // 宾客的人数 final int guestNum = 5; // 新郎和新娘 final int mainNum = 2; phaser.bulkRegister(mainNum + guestNum); for (int i = 0; i < guestNum; i++) { new Thread(new Person("宾客" + i)).start(); } new Thread(new Person("新娘")).start(); new Thread(new Person("新郎")).start();}
static class WeddingPhaser extends Phaser { @Override protected boolean onAdvance(int phase, int registeredParties) { switch (phase) { case 0: System.out.println("所有人到齐"); return false; case 1: System.out.println("所有人吃饭"); return false; case 2: System.out.println("所有人离开"); return false; case 3: System.out.println("新郎新娘拥抱"); return true; default: return true; } }}
static class Person implements Runnable { String name;
Person(String name) { this.name = name; }
@Override public void run() { // 先到达婚礼现场 arrive(); // 吃饭 eat(); // 离开 leave(); // 拥抱,只保留新郎和新娘两个线程可以执行 hug(); }
private void arrive() { millSleep(); System.out.println("name:" + name + " 到来"); phaser.arriveAndAwaitAdvance(); }
private void eat() { millSleep(); System.out.println("name:" + name + " 吃饭"); phaser.arriveAndAwaitAdvance(); }
private void leave() { millSleep(); System.out.println("name:" + name + " 离开"); phaser.arriveAndAwaitAdvance(); }
private void hug() { if ("新娘".equals(name) || "新郎".equals(name)) { millSleep(); System.out.println("新娘新郎拥抱"); phaser.arriveAndAwaitAdvance(); } else { phaser.arriveAndDeregister(); } }}
复制代码


}Exchanger 用于线程之间交换数据, exchange() 方法是阻塞的,所以要两个 exchange 行为都执行到才会触发交换。


package git.snippets.juc;


import java.util.concurrent.Exchanger;import java.util.concurrent.TimeUnit;


/**


  • Exchanger 用于两个线程之间交换变量*/public class ExchangerUsage {static Exchanger<String> semaphore = new Exchanger<>();

  • public static void main(String[] args) {

  • }}LockSupport 其他锁的底层用的是 AQS


原先让线程等待需要 wait/await ,现在仅需要 LockSupport.park()


原先叫醒线程需要 notify/notifyAll ,现在仅需要 LockSupport.unpark() , LockSupport.unpark() 还可以叫醒指定线程,


示例代码:


package git.snippets.juc;


import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.LockSupport;


/**


  • 阻塞指定线程,唤醒指定线程*/public class LockSupportUsage {public static void main(String[] args) {Thread t = new Thread(() -> {for (int i = 0; i < 10; i++) {try {if (i == 5) {LockSupport.park();}if (i == 8) {LockSupport.park();}TimeUnit.SECONDS.sleep(1);System.out.println(i);} catch (InterruptedException e) {e.printStackTrace();}}});t.start();// unpark 可以先于 park 调用//LockSupport.unpark(t);try {TimeUnit.SECONDS.sleep(8);} catch (InterruptedException e) {e.printStackTrace();}

  • }}实现一个监控元素的容器实现一个容器,提供两个方法


// 向容器中增加一个元素 void add(T t);// 返回容器大小 int size();有两个线程,线程 1 添加 10 个元素到容器中,线程 2 实现监控元素的个数,当个数到 5 个时,线程 2 给出提示并结束


方法 1. 使用 wait + notify 实现


方法 2. 使用 CountDownLatch 实现


方法 3. 使用 LockSupport 实现


代码如下:


package git.snippets.juc;


import java.util.ArrayList;import java.util.Collections;import java.util.List;import java.util.concurrent.CountDownLatch;import java.util.concurrent.TimeUnit;import java.util.concurrent.locks.LockSupport;


// 实现一个容器,提供两个方法,add,size,有两个线程,// 线程 1 添加 10 个元素到容器中,// 线程 2 实现监控元素的个数,// 当个数到 5 个时,线程 2 给出提示并结束 public class MonitorContainer {


public static void main(String[] args) {
useLockSupport(); // useCountDownLatch(); // useNotifyAndWait();}

/** * 使用LockSupport */private static void useLockSupport() { System.out.println("use LockSupport..."); Thread adder; List<Object> list = Collections.synchronizedList(new ArrayList<>()); Thread finalMonitor = new Thread(() -> { LockSupport.park(); if (match(list)) { System.out.println("filled 5 elements size is " + list.size()); LockSupport.unpark(null); } }); adder = new Thread(() -> { for (int i = 0; i < 10; i++) { increment(list); if (match(list)) { LockSupport.unpark(finalMonitor); } } }); adder.start(); finalMonitor.start();}
/** * 使用CountDownLatch */private static void useCountDownLatch() { System.out.println("use CountDownLatch..."); List<Object> list = Collections.synchronizedList(new ArrayList<>()); CountDownLatch latch = new CountDownLatch(5); Thread adder = new Thread(() -> { for (int i = 0; i < 10; i++) { increment(list); if (i <= 4) { latch.countDown(); } } }); Thread monitor = new Thread(() -> { try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } if (match(list)) { System.out.println("filled 5 elements"); } }); adder.start(); monitor.start();}
/** * notify + wait 实现 */private static void useNotifyAndWait() { System.out.println("use notify and wait..."); List<Object> list = Collections.synchronizedList(new ArrayList<>()); final Object o = new Object(); Thread adder = new Thread(() -> { synchronized (o) { for (int i = 0; i < 10; i++) { increment(list); if (match(list)) { o.notify(); try { o.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } System.out.println("add finished"); o.notify(); } }); Thread monitor = new Thread(() -> { synchronized (o) { if (match(list)) { System.out.println("5 elements added " + list.size()); o.notify(); try { o.wait(); System.out.println("monitor finished"); o.notify(); } catch (InterruptedException e) { e.printStackTrace(); }
} } }); adder.start(); monitor.start();}
/** * 只要是5的倍数,就循环打印 */private static void useNotifyAndWaitLoop() { List<Object> list = Collections.synchronizedList(new ArrayList<>()); final Object o = new Object(); Thread adder = new Thread(() -> { synchronized (o) { for (; ; ) { increment(list); if (match(list)) { o.notify(); try { o.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } }); Thread monitor = new Thread(() -> { synchronized (o) { while (true) { if (match(list)) { System.out.println("filled 5 elements"); } o.notify(); try { o.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } }); adder.start(); monitor.start();}
private static void increment(List<Object> list) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e1) { e1.printStackTrace(); } list.add(new Object()); System.out.println("list add the ele, size is " + list.size());}
private static boolean match(List<Object> list) { return list.size() % 5 == 0;}
复制代码


}生产者消费者问题写一个固定容量的同步容器,拥有 put 和 get 方法,以及 getCount 方法,能够支持 2 个生产者线程以及 10 个消费者线程的阻塞调用。


方法 1. 使用 wait/notifyAll


方法 2. ReentrantLock 的 Condition ,本质就是等待队列


package git.snippets.juc;


import java.util.LinkedList;import java.util.concurrent.locks.Condition;import java.util.concurrent.locks.ReentrantLock;


// 写一个固定容量的同步容器,拥有 put 和 get 方法,以及 getCount 方法,能够支持 2 个生产者线程以及 10 个消费者线程的阻塞调用。public class ProducerAndConsumer {public static void main(String[] args) {// MyContainerByCondition container = new MyContainerByCondition(100);MyContainerByNotifyAndWait container = new MyContainerByNotifyAndWait(100);for (int i = 0; i < 25; i++) {new Thread(container::get).start();}for (int i = 0; i < 20; i++) {new Thread(() -> container.put(new Object())).start();}}}


// 使用 ReentrantLock 的 Conditionclass MyContainerByCondition {static ReentrantLock lock = new ReentrantLock();final int MAX;private final LinkedList<Object> list = new LinkedList<>();Condition consumer = lock.newCondition();Condition producer = lock.newCondition();


public MyContainerByCondition(int limit) {    this.MAX = limit;}
public void put(Object object) { lock.lock(); try { while (getCount() == MAX) { System.out.println("container is full"); try { producer.await(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(object); consumer.signalAll(); System.out.println("contain add a object, current size " + getCount());
} finally { lock.unlock(); }
}
public Object get() { lock.lock(); try { while (getCount() == 0) { try { System.out.println("container is empty"); consumer.await(); } catch (InterruptedException e) { e.printStackTrace(); } } Object object = list.removeFirst(); producer.signalAll();
System.out.println("contain get a object, current size " + getCount()); return object; } finally { lock.unlock(); }
}
public synchronized int getCount() { return list.size();}
复制代码


}


// 使用 synchronized 的 wait 和 notifyAllclass MyContainerByNotifyAndWait {LinkedList<Object> list = null;final int limit;


MyContainerByNotifyAndWait(int limit) {    this.limit = limit;    list = new LinkedList<>();}
synchronized int getCount() { return list.size();}
// index 从0开始计数synchronized Object get() { while (list.size() == 0) { System.out.println("container is empty"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Object o = list.removeFirst();
System.out.println("get a data"); this.notifyAll(); return o;}
synchronized void put(Object data) { while (list.size() > limit) { System.out.println("container is full , do not add any more"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } list.add(data);
System.out.println("add a data"); this.notifyAll();}
复制代码


}说明本文涉及到的所有代码和图例

用户头像

Java-fenn

关注

需要Java资料或者咨询可加我v : Jimbye 2022.08.16 加入

还未添加个人简介

评论

发布
暂无评论
Java 多线程:锁_Java_Java-fenn_InfoQ写作社区