Java 并发 JUC(java.util.concurrent)集合不安全
作者:芝士味的椒盐
- 2022 年 5 月 04 日
本文字数:7131 字
阅读完需:约 23 分钟
👨🏻🎓博主介绍:大家好,我是芝士味的椒盐,一名在校大学生,热爱分享知识,很高兴在这里认识大家🌟🌈擅长领域:Java、消息中间件、大数据、运维。
🙏🏻如果本文章各位小伙伴们有帮助的话,🍭关注+👍🏻点赞+🗣评论+📦收藏。
🤝另本人水平有限,旨在创作简单易懂的文章,在文章描述时如有错,恳请各位大佬指正,在此感谢!!!
集合是不安全的
先给大家上个集合框架家族图
List 不安全
package icu.lookyousmileface.notsafe;
import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* @author starrysky
* @title: UnSafeList
* @projectName Juc_Pro
* @description: 集合不安全
* @date 2021/1/2912:04 下午
*/
public class UnSafeList {
public static void main(String[] args) {
// 并发下 ArrayList 不安全的吗,Synchronized;
/**
* 解决方案;
* 1、List<String> list = new Vector<>();
* 2、List<String> list = Collections.synchronizedList(new ArrayList<>());
* 3、List<String> list = new CopyOnWriteArrayList<>();
*/
// CopyOnWrite 写入时复制 COW 计算机程序设计领域的一种优化策略;
// 多个线程调用的时候,list,读取的时候,固定的,写入(覆盖)
// 在写入的时候避免覆盖,造成数据问题!
// 读写分离
// CopyOnWriteArrayList 比 Vector Nb 在哪里?
// List<String> list = new Vector<>();
List<String> list = new CopyOnWriteArrayList();
// List<String> list = Collections.synchronizedList(new ArrayList<>());
// List<String> list = new ArrayList<>();
for (int i = 0; i < 100; i++) {
new Thread(()->{
list.add(UUID.randomUUID().toString().substring(0,5));
System.out.println(list);
},String.valueOf(i)).start();
}
}
}
复制代码
Set 不安全
package icu.lookyousmileface.notsafe;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;
/**
* @author starrysky
* @title: UnSafeSet
* @projectName Juc_Pro
* @description: Set不安全,set底层底层HashMap
* @date 2021/1/291:00 下午
*/
public class UnSafeSet {
public static void main(String[] args) {
/**
* 和List同理
*/
// Set<String> set = new HashSet<>();
// Set<String> set = Collections.synchronizedSet(new HashSet<>());
Set<String> set = new CopyOnWriteArraySet<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
set.add(UUID.randomUUID().toString().substring(0, 6));
System.out.println(set);
}, String.valueOf(i)).start();
}
}
}
复制代码
⚠️ Tips:
HashSet底层:底层使用hashmap去重,vlue是常量值,key就是我们传进去的值
复制代码
public HashSet() {
map = new HashMap<>();
}
public boolean add(E e) {
return map.put(e, PRESENT)==null;
}
private static final Object PRESENT = new Object();
复制代码
Map 不安全
package icu.lookyousmileface.notsafe;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author starrysky
* @title: UnSafeMap
* @projectName Juc_Pro
* @description: Map不安全
* @date 2021/1/291:14 下午
*/
public class UnSafeMap {
public static void main(String[] args) {
// map 是这样用的吗? 不是,工作中不用 HashMap
// 默认等价于什么? new HashMap<>(16,0.75);
// Map<String,String> map = new HashMap<>();
// Map<String,String> map = Collections.synchronizedMap(new HashMap<>());
Map<String,String> map = new ConcurrentHashMap<>();
for (int i = 0; i < 30; i++) {
new Thread(()->{
map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));
System.out.println(map);
},String.valueOf(i)).start();
}
}
}
复制代码
⚠️ Tips:new HashMap<>();默认等价于new HashMap<>(16,0.75);,工作中不用 HashMap
复制代码
Callable
代码实现:
package icu.lookyousmileface.callable;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
/**
* @author starrysky
* @title: CallableUse
* @projectName Juc_Pro
* @description: Callable代替了Runable接口,run/call,有异常有返回值
* @date 2021/1/291:43 下午
*/
public class CallableUse {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//适配类
FutureTask<Integer> integerFutureTask = new FutureTask<>(new MyThread());
new Thread(integerFutureTask,"A").start();
new Thread(integerFutureTask,"B").start();//有缓存,效率高
//有返回值,get会产生阻塞,可以使用异步线程通信长处理
System.out.println(integerFutureTask.get());
}
}
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
System.out.println("调用了call方法");
return 1024;
}
}
复制代码
⚠️ Tips:
1、可以有返回值
2、可以抛出异常
3、方法不同,run()/ call()
**细节:**
1、有缓存
2、结果可能需要等待,会阻塞!
复制代码
常用辅助类(必会)
CountDownLatch
package icu.lookyousmileface.concurrentutils;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.FutureTask;
/**
* @author starrysky
* @title: CountDownLatchUse
* @projectName Juc_Pro
* @description: CountDownLatch辅助类常用必会
* @date 2021/1/292:13 下午
*/
public class CountDownLatchUse {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <=6; i++) {
new Thread(new FutureTask<Integer>(()-> {
System.out.println(Thread.currentThread().getName()+":"+"走出教室!");
countDownLatch.countDown();
return 1024;}),String.valueOf(i)).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("全员已经都离开来教室,教室大门锁上!!!");
}
}
复制代码
⚠️ 原理:`countDownLatch.countDown()`; // 数量-1,`countDownLatch.await()`; // 等待计数器归零,然后再向下执行,每次有线程调用 countDown() 数量-1,假设计数器变为0,countDownLatch.await() 就会被唤醒,继续执行!
复制代码
CyclicBarrier
package icu.lookyousmileface.concurrentutils;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
* @author starrysky
* @title: CyclicBarrierUse
* @projectName Juc_Pro
* @description: 加法计数器
* @date 2021/1/292:28 下午
*/
public class CyclicBarrierUse {
public static void main(String[] args) {
//CyclicBarrier本身就可以自带一个线程
CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
System.out.println("召唤神龙!");
});
for (int i = 1; i <= 7; i++) {
final int temp = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+":收集齐"+temp+"颗龙珠");
try {
//此处的await是等待达到parties计数的时候,执行cyclicBarrier的线程,没有将不会执行
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
复制代码
Semaphore
信号量,也就是限流的作用,比较常用
package icu.lookyousmileface.concurrentutils;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* @author starrysky
* @title: SemaphoreUse
* @projectName Juc_Pro
* @description:
* @date 2021/1/292:38 下午
*/
public class SemaphoreUse {
public static void main(String[] args) {
//可以用来限制流量
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
try {
//关闸
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"入库");
TimeUnit.SECONDS.sleep(2);
System.out.println(Thread.currentThread().getName()+"离开");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//开闸
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
复制代码
⚠️ Tips:
**原理:**
`semaphore.acquire()` 获得,假设如果已经满了,等待,等待被释放为止!`semaphore.release();` 释放,会将当前的信号量释放 + 1,然后唤醒等待的线程!
作用: 多个共享资源互斥的使用!并发限流,控制最大的线程数!
复制代码
读写锁
试验代码:
package icu.lookyousmileface.readeandwritelock;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author starrysky
* @title: ReadAndWriteLockUse
* @projectName Juc_Pro
* @description: 读写锁
* 独占锁(写锁) 一次只能被一个线程占有
* 共享锁(读锁) 多个线程可以同时占有
* ReadWriteLock
* 读-读 可以共存!
* 读-写 不能共存!
* 写-写 不能共存!
* @date 2021/1/293:08 下午
*/
public class ReadAndWriteLockUse {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 1; i <= 6; i++) {
final int temp = i;
new Thread(()->{
myCache.put(String.valueOf(temp),String.valueOf(temp));
},String.valueOf(i)).start();
}
for (int i = 1; i <= 6; i++) {
final int temp = i;
new Thread(()->{
myCache.get(String.valueOf(temp));
},String.valueOf(i)).start();
}
}
}
class MyCache{
private final Map<String,String> map = new HashMap<>(16, 0.75F);
private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
// 存,写入的时候,只希望同时只有一个线程写
public void put(String key,String value){
reentrantReadWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName()+":正在插入"+key);
map.put(key, value);
System.out.println(Thread.currentThread().getName()+":"+key+"插入完成!");
}catch (Exception e){
e.printStackTrace();
}finally {
reentrantReadWriteLock.writeLock().unlock();
}
}
// 取,读,所有人都可以读!
public void get(String key){
reentrantReadWriteLock.readLock().lock();
try{
System.out.println(Thread.currentThread().getName()+":正在读取"+key);
String value = map.get(key);
System.out.println(Thread.currentThread().getName()+":"+key+"读取完成!"+"值:"+value);
}catch (Exception e){
e.printStackTrace();
}finally {
reentrantReadWriteLock.readLock().unlock();
}
}
}
复制代码
阻塞队列
阻塞队列:
⚠️ Tips:会使用 阻塞队列的地方:多线程并发处理,线程池!
四组 API
试验代码:
SynchronousQueue 同步队列
试验代码:
package icu.lookyousmileface.syncqueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* @author starrysky
* @title: SyncQueueUse
* @projectName Juc_Pro
* @description: 同步队列
* @date 2021/1/299:17 下午
*/
public class SyncQueueUse {
public static void main(String[] args) throws InterruptedException {
SynchronousQueue<String> syncQueue = new SynchronousQueue<>();
//无须for循环会自动产生一种互喂模式
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + ":" + "A入队列");
syncQueue.put("A");
System.out.println(Thread.currentThread().getName() + ":" + "B入队列");
syncQueue.put("B");
System.out.println(Thread.currentThread().getName() + ":" + "C入队列");
syncQueue.put("C");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T1").start();
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + ":" + syncQueue.take() + "出队列");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + ":" + syncQueue.take() + "出队列");
TimeUnit.SECONDS.sleep(1);
System.out.println(Thread.currentThread().getName() + ":" + syncQueue.take() + "出队列");
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "T2").start();
}
}
复制代码
⚠️ Tips:
没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素!put、take
划线
评论
复制
发布于: 刚刚阅读数: 2
版权声明: 本文为 InfoQ 作者【芝士味的椒盐】的原创文章。
原文链接:【http://xie.infoq.cn/article/3e24dd97ba11f138c14f66c47】。文章转载请联系作者。
芝士味的椒盐
关注
Java、消息中间件、大数据、运维 2022.04.23 加入
华为云云享专家、51CTOtop红人、CSDN博主、2021年第十届“中国软件杯”大学生软件设计大赛-B3-高并发条件下消息队列的设计与实现国赛二等奖、2021年浙江省职业院校技能大赛高职组“大数据技术与应用”赛项一等奖
评论