写点什么

并发容器与并发控制 - JUC

发布于: 2021 年 04 月 11 日
并发容器与并发控制 - JUC

摘要

  • 为什么没见人用VectorHashtable了?HashMap它又线程不安全在哪里?

  • ConcurrentHashMap的进化与骚操作有哪些?

  • Copy-On-Write 是个啥思想?有哪些例子?

  • 为什么需要并发队列?又有哪些我们视而不见的并发的队列?

  • 当我们想控制线程的先来后到时该咋办?一个个去讲道理吗?

并发容器

先总览一下这些并发容器都在整什么幺蛾子

1. Concurrent*  大部分是通过CAS实现并发2. CopyOnWrite*  复制一份原数据3. Blocking*  通过AQS实现复制代码
复制代码

一、为什么 Vector 和 Hashtable 被留在了历史的长河中?

很简单,别想太多,就是因为性能不好。

那为什么性能不好呢?它性能丢失在哪儿?

我们先看一看Vectorget方法:



look, look, 这个synchronized是直接修饰在方法上的,如果你上下翻翻,就可以发现基本上这个类的所有方法都是synchronized修饰的。

那有人可能问了,为什么锁方法就性能差?因为这个锁粒度是实例对象呀

Hashtable也是如此。

二、为什么 HashMap 是线程不安全的?

  1. 同时 put 碰撞导致数据丢失

    比如两个线程都放数据,使用同样的 key,那么它们计算出来的 hash 值是一样的并放在同一位置,这必然会导致一个数据丢失

  2. 同时 put 扩容导致数据丢失

    同时扩容时,只会保留一个扩容的数组。

  1. 死循环造成 CPU100%

    多线程同时扩容,可能会造成循环链表导致 CPU100%。但是这个问题本身追究并无意义(Sun 官方也是这么认为的,他觉得你就不应该用 hashMap 来解决并发场景,这本身就有问题不是吗?)。如果着实有兴趣可以瞅瞅这个 coolshell.cn/articles/96…

三、ConcurrentHashMap 1.7 与 1.8 的差别是什么?

  1. 数据结构

1.7 时的结构(Segment + HashEntry + Unsafe):


1.8 时的结构(移除 Segment,使锁的粒度更小,Synchronized + CAS + Node + Unsafe):

2.Hash 碰撞

将原本的拉链法变为如果节点数超过 8 个以后就变换为红黑树。

这里有一个小问题,为什么超过 8 就转为红黑树?为什么是 8?为什么不一开始就使用红黑树?

  1. 红黑树每个节点占用空间是链表的两倍。

  2. 作者用泊松分布的概率计算,到 8 的时候概率也就是极小了。

3.保证并发安全

1.7 使用分段锁 Segment(extends ReentreenLock)来保证 16 个段的安全(段不可动态增加);1.8 使用CAS + synchronized来保证,粒度为每个 Node。


4.查询复杂度

当冲突超过 8 个时也能保证查询效率,由原来的拉链法的O(n)变为红黑树的O(logn)

四、ConcurrentHashMap 如何实现如 a++这样的组合操作?

显然,ConcurrentHashMap是保证了同时 put 的并发安全性,但是并没有说,你同时执行a++时也是线程安全的:

int a = map.get(key);int b = a+1;map.put(key, b);
复制代码

那让我们来想想,咋解决这个问题呢?

把这段用synchronized包起来吗?

那使用ConcurrentHashMap还有什么意义呢?我们来看看这样一个方法:replace

// 利用CAS的思想,去循环的判断并进行++的操作,直到成功为止// replace保证了并发修改的安全性while (true) {    int ori = map.get("key");    int tar = ori + 1;    boolean isSuccess = map.replace("key", ori, tar);    if (isSuccess) {        break;    }}复制代码
复制代码

再看一个组合操作,用于解决相同 key 的 put 覆盖问题:putIfAbsent

// 核心思想如下表示if (!map.contains("key")){    return map.put("key", "value");}else {    return map.get("key");}复制代码
复制代码

五、CopyOnWriteArrayList 适用场景有哪些?

我们先想想为什么会有这个玩意?

  1. 用来代替VectorSynchronizedList,就和ConcurrentHashMap代替SynchronizedMap的原因一样

  2. VectorSynchronizedList的锁的粒度太大,并发效率相比较低,并且在遍历时无法修改。

  3. Copy-On-Write 容器还包括CopyOnWriteArraySet,用于替代同步的Set


好了,现在我们来说说适用场景

  1. 读多写少

读操作需要尽可能的快,而写操作哪怕慢一些也无伤大雅,例如:黑名单场景。

2.写入实时性要求不高

就是说,我的修改不是说非得立即生效,哪怕是有那么一小会,还是在读之前的数据,也没关系。

CopyOnWriteArrayList 的读写规则是什么?

  • 读读同步、读写同步、写写互斥

  • 看起来是不是和读写锁的规则贼像?但是不一样哦。这里有一个升级,就是你即便在写,我也是可以读的。

  • 我们来看一下写入的时候

  • 上来先锁住,然后整一个新的数组出来,把新元素怼进去,再改一下原地址,解锁,一气呵成。

  • 读取的时候呢?


  • 啥保护措施都没有的,读就完事了。

实现原理是什么?

  • CopyOnWrite 原理:写入时会先搞一个副本出来,写好以后改地址

  • “不变性”原理:每个 list 都相当于一个不可变的集合,没有修改操作在上面进行

那有何缺点呢?

  • 数据一致性:只能保证最终数据的一致性,不能保证实时一致性。

  • 内存占用:副本可是占用一倍的内存的哦。

六、并发队列又是什么?

就是保证了并发安全的队列,分为阻塞队列(如下图)和非阻塞队列,阻塞与非阻塞的区别就是,你放入(或者取出)元素时,在队列已满(或者已空)的情况下会不会阻塞。


阻塞队列最典型的应用就是生产者消费者。 

那上图的这些队列有何特点呢?我们一一道来

  1. ArrayBlockingQueue: 有界、可指定容量、底层由数组实现、可设置公平与否

我们可以看到在放入元素那一步使用了可重入锁的锁住(可打断)的方法。

2.PriorityBlockingQueue: 支持优先级设置(依据对象的自然排序顺序或者是构造函数所带的 Comparator)、队列的出入顺序是设置的顺序(不是先进先出)、无界队列、PriorityQueue的线程安全版本。

3.LinkedBlockingQueue: 默认无界(容量为 Integer.MAX_VALUE,容量可指定)、内部结构为一个 Node 数组+两把锁、一般情况下性能优于ArrayBlockingQueue(锁粒度更小)


这里使用的是Condition + ReentrantLock,其实就相当于 synchronized + Object.wait() + Object.notify(),这是一个适用于生产者消费者的绝佳队列,如果想看原生实现可以瞅瞅我在并发原理中利用 wait + notify 实现的生产者消费者。

我们看这个放入元素的方法就可以理解到:如果队列已满,则阻塞,否则就放一个元素,如果还可以继续放,则唤醒在 notFull 中等待的线程,如果放之前队列为空,则唤醒在 notEmpty 上等待的线程。

4.SynchronousQueue: 容量为 0 不是 1、线程池Executors.newCacheThreadPool()使用的阻塞队列


它没有实际的容量,任意线程(生产者线程或者消费者线程,生产类型的操作比如 put,offer,消费类型的操作比如 poll,take)都会等待知道获得数据或者交付完成数据才会返回,一个生产者线程的使命是将线程附着着的数据交付给一个消费者线程,而一个消费者线程则是等待一个生产者线程的数据。

5.ConcurrentLinkedQueue: 并发包中唯一一个非阻塞的队列,内部由链表实现、核心思想也是 CAS。

6.DelayQueue: 延迟队列(根据延迟时间排序)、元素需实现Delayed接口(规定排序规则)

控制并发流程

先瞅瞅都有哪些可以控制并发流程

七、CountDownLatch 有哪些典型的适用场景?

  1. 一个线程等多个线程执行完成

在媒体处理中,拼接命令需要等所有待拼接的任务下载完成后才执行。

2.多个线程等一个线程执行完

压测场景下,所有线程等一声令下立马向服务器施加压力,而不是还去慢悠悠的创建线程。

3.多等多

下面举一个小栗子,模拟运动员跑步的场景,所有运动员等枪响后开跑,待运动员都到达终点后裁判才宣布比赛结束。


CountDownLatch begin = new CountDownLatch(1);CountDownLatch end = new CountDownLatch(10);ExecutorService service = Executors.newFixedThreadPool(5);for (int i = 0; i < 10; i++) {    final int no = i + 1;    Runnable runnable = () -> {        System.out.println("No." + no + "准备完毕,等待枪响");        try {            begin.await();            System.out.println("No." + no + "开始冲刺了");            Thread.sleep((long) (Math.random() * 10000));            System.out.println("No." + no + "跑到终点了");        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            end.countDown();        }    };    service.submit(runnable);}//裁判员搞一些准备工作...Thread.sleep(5000);System.out.println("枪响,比赛开始!");begin.countDown();
end.await();System.out.println("所有人到达终点,比赛结束!");
复制代码

这个类不能重用,即不可以重新计数。如果需要重用,考虑 CyclicBarrier 和建新的实例。

八、Semaphore 有哪些适用场景?

需要控制并发的线程数时。

例如:在媒体处理的场景中,一台机器的 CPU 性能是有限的,我们最多只能允许 3 个转码任务同时进行,这个时候就需要控制执行转码的线程数量了。

我们用一张图来理解一下:

接下来看一个小 Demo

static Semaphore semaphore = new Semaphore(3, true);
public static void main(String[] args) { ExecutorService service = Executors.newFixedThreadPool(50); for (int i = 0; i < 100; i++) { service.submit(new Task()); } service.shutdown();}
static class Task implements Runnable {
@Override public void run() { try { semaphore.acquire(); // 这里需要设置请求的数量 //semaphore.acquire(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "拿到了许可证"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + "释放了许可证"); semaphore.release(); // 也可以设置释放的数量 //semaphore.release(2); }}
复制代码

注意点:

  1. 获取和释放的数量必须一致(比如你请求的是 3 个,但是你却每次只释放 2 个,那这个许可证岂不是就越用越少了,这玩意并没有在类的层面进行限制,所以需要编码时候注意)。

  2. 公平性一般设置为 true 比较合理(因为信号量一般用在控制慢服务,如果你还设置不公平,就很容易导致饥饿)。

  3. 并不是必须由请求的线程去释放,也可以由其他线程去释放(可以是线程 A 获取了 2 个,线程 B 没有获取,只去释放 2 个)。

九、CyclicBarrier 与 CountDownLatch 的区别是什么?

  1. 作用不同:CyclicBarrier需要等待固定数量的线程到达栅栏后才能继续执行,而CountDownLatch只需要计数到 0 即可。也就是说,CountDownLatch用于事件,CyclicBarrier用于线程。

来看一个CyclicBarrier的 Demo:

public static void main(String[] args) {    CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> System.out.println("所有人都到场了, 大家统一出发!"));    for (int i = 0; i < 10; i++) {        new Thread(new Task(i, cyclicBarrier)).start();    }}
static class Task implements Runnable{ private int id; private CyclicBarrier cyclicBarrier;
public Task(int id, CyclicBarrier cyclicBarrier) { this.id = id; this.cyclicBarrier = cyclicBarrier; }
@Override public void run() { System.out.println("线程" + id + "现在前往集合地点"); try { Thread.sleep((long) (Math.random()*10000)); System.out.println("线程"+id+"到了集合地点,开始等待其他人到达"); cyclicBarrier.await(); System.out.println("线程"+id+"出发了"); } catch (InterruptedException | BrokenBarrierException e) { e.printStackTrace(); } }}
复制代码


  1. 可重用性不同:new CyclicBarrier(5)等 5 个线程到了后执行run,还可以继续等接下来的 5 个线程到。

总结

呼呼,总算说完了花里胡哨的并发容器,现在知道了可以用 CopyOnWrite 去解决读多写少的场景;用阻塞队列去实现生产者消费者模型;用CountDownLatch来驯服一个个溜得贼快的线程。

下一章我们搞个大事情,并发界的总统山:AQS。尝试着利用AQS去实现一个我们自己的并发工具。


原文链接:https://juejin.cn/post/6844904194457993230


发布于: 2021 年 04 月 11 日阅读数: 50
用户头像

领取文章中资料添加小助理vx:mxzFAFAFA 2021.02.05 加入

Java架构大数据每天分享干货!

评论

发布
暂无评论
并发容器与并发控制 - JUC