写点什么

Java 并发 JUC(java.util.concurrent)集合不安全

  • 2022 年 5 月 04 日
  • 本文字数:7131 字

    阅读完需:约 23 分钟


👨🏻‍🎓博主介绍:大家好,我是芝士味的椒盐,一名在校大学生,热爱分享知识,很高兴在这里认识大家🌟🌈擅长领域:Java、消息中间件、大数据、运维。

🙏🏻如果本文章各位小伙伴们有帮助的话,🍭关注+👍🏻点赞+🗣评论+📦收藏。

🤝另本人水平有限,旨在创作简单易懂的文章,在文章描述时如有错,恳请各位大佬指正,在此感谢!!!

集合是不安全的

  • 先给大家上个集合框架家族图

  • List 不安全


    package icu.lookyousmileface.notsafe;        import java.util.*;    import java.util.concurrent.CopyOnWriteArrayList;        /**     * @author starrysky     * @title: UnSafeList     * @projectName Juc_Pro     * @description: 集合不安全     * @date 2021/1/2912:04 下午     */    public class UnSafeList {        public static void main(String[] args) {            // 并发下 ArrayList 不安全的吗,Synchronized;            /**             * 解决方案;             * 1、List<String> list = new Vector<>();             * 2、List<String> list = Collections.synchronizedList(new ArrayList<>());             * 3、List<String> list = new CopyOnWriteArrayList<>();             */            // CopyOnWrite 写入时复制  COW  计算机程序设计领域的一种优化策略;            // 多个线程调用的时候,list,读取的时候,固定的,写入(覆盖)            // 在写入的时候避免覆盖,造成数据问题!            // 读写分离            // CopyOnWriteArrayList  比 Vector Nb 在哪里?    //        List<String> list = new Vector<>();            List<String> list = new CopyOnWriteArrayList();    //        List<String> list = Collections.synchronizedList(new ArrayList<>());    //        List<String> list = new ArrayList<>();            for (int i = 0; i < 100; i++) {                new Thread(()->{                    list.add(UUID.randomUUID().toString().substring(0,5));                    System.out.println(list);                },String.valueOf(i)).start();            }            }    }
复制代码


  • Set 不安全


    package icu.lookyousmileface.notsafe;        import java.util.Collections;    import java.util.HashSet;    import java.util.Set;    import java.util.UUID;    import java.util.concurrent.CopyOnWriteArraySet;        /**     * @author starrysky     * @title: UnSafeSet     * @projectName Juc_Pro     * @description: Set不安全,set底层底层HashMap     * @date 2021/1/291:00 下午     */    public class UnSafeSet {        public static void main(String[] args) {            /**             * 和List同理             */    //        Set<String> set = new HashSet<>();    //        Set<String> set =  Collections.synchronizedSet(new HashSet<>());            Set<String> set = new CopyOnWriteArraySet<>();            for (int i = 0; i < 30; i++) {                new Thread(() -> {                    set.add(UUID.randomUUID().toString().substring(0, 6));                    System.out.println(set);                    }, String.valueOf(i)).start();            }        }    }
复制代码


⚠️ Tips:
HashSet底层:底层使用hashmap去重,vlue是常量值,key就是我们传进去的值
复制代码


    public HashSet() {            map = new HashMap<>();        }        public boolean add(E e) {            return map.put(e, PRESENT)==null;        }        private static final Object PRESENT = new Object();
复制代码


  • Map 不安全


    package icu.lookyousmileface.notsafe;        import java.util.Collections;    import java.util.HashMap;    import java.util.Map;    import java.util.UUID;    import java.util.concurrent.ConcurrentHashMap;        /**     * @author starrysky     * @title: UnSafeMap     * @projectName Juc_Pro     * @description: Map不安全     * @date 2021/1/291:14 下午     */    public class UnSafeMap {        public static void main(String[] args) {            // map 是这样用的吗? 不是,工作中不用 HashMap            // 默认等价于什么?  new HashMap<>(16,0.75);    //        Map<String,String> map = new HashMap<>();    //        Map<String,String> map = Collections.synchronizedMap(new HashMap<>());            Map<String,String> map =  new ConcurrentHashMap<>();            for (int i = 0; i < 30; i++) {                new Thread(()->{                    map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0,5));                    System.out.println(map);                },String.valueOf(i)).start();            }        }    }
复制代码


⚠️ Tips:new HashMap<>();默认等价于new HashMap<>(16,0.75);,工作中不用 HashMap
复制代码

Callable



  • 代码实现:


    package icu.lookyousmileface.callable;        import java.util.concurrent.Callable;    import java.util.concurrent.ExecutionException;    import java.util.concurrent.FutureTask;        /**     * @author starrysky     * @title: CallableUse     * @projectName Juc_Pro     * @description: Callable代替了Runable接口,run/call,有异常有返回值     * @date 2021/1/291:43 下午     */    public class CallableUse {        public static void main(String[] args) throws ExecutionException, InterruptedException {            //适配类            FutureTask<Integer> integerFutureTask = new FutureTask<>(new MyThread());            new Thread(integerFutureTask,"A").start();            new Thread(integerFutureTask,"B").start();//有缓存,效率高            //有返回值,get会产生阻塞,可以使用异步线程通信长处理            System.out.println(integerFutureTask.get());        }    }        class MyThread implements Callable<Integer> {            @Override        public Integer call() throws Exception {            System.out.println("调用了call方法");            return 1024;        }    }
复制代码


⚠️ Tips:
1、可以有返回值
2、可以抛出异常
3、方法不同,run()/ call()
**细节:**
1、有缓存
2、结果可能需要等待,会阻塞!
复制代码

常用辅助类(必会)

  • CountDownLatch


    package icu.lookyousmileface.concurrentutils;        import java.util.concurrent.Callable;    import java.util.concurrent.CountDownLatch;    import java.util.concurrent.FutureTask;        /**     * @author starrysky     * @title: CountDownLatchUse     * @projectName Juc_Pro     * @description: CountDownLatch辅助类常用必会     * @date 2021/1/292:13 下午     */    public class CountDownLatchUse {        public static void main(String[] args) {            CountDownLatch countDownLatch = new CountDownLatch(6);                for (int i = 1; i <=6; i++) {                new Thread(new FutureTask<Integer>(()-> {                    System.out.println(Thread.currentThread().getName()+":"+"走出教室!");                    countDownLatch.countDown();                    return 1024;}),String.valueOf(i)).start();            }            try {                countDownLatch.await();            } catch (InterruptedException e) {                e.printStackTrace();            }            System.out.println("全员已经都离开来教室,教室大门锁上!!!");        }    }
复制代码


⚠️  原理:`countDownLatch.countDown()`; // 数量-1,`countDownLatch.await()`; // 等待计数器归零,然后再向下执行,每次有线程调用 countDown() 数量-1,假设计数器变为0,countDownLatch.await() 就会被唤醒,继续执行!
复制代码


  • CyclicBarrier


    package icu.lookyousmileface.concurrentutils;        import java.util.concurrent.BrokenBarrierException;    import java.util.concurrent.CyclicBarrier;        /**     * @author starrysky     * @title: CyclicBarrierUse     * @projectName Juc_Pro     * @description: 加法计数器     * @date 2021/1/292:28 下午     */    public class CyclicBarrierUse {        public static void main(String[] args) {    //CyclicBarrier本身就可以自带一个线程            CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {                System.out.println("召唤神龙!");            });                for (int i = 1; i <= 7; i++) {                final int temp = i;                new Thread(()->{                    System.out.println(Thread.currentThread().getName()+":收集齐"+temp+"颗龙珠");                    try {                        //此处的await是等待达到parties计数的时候,执行cyclicBarrier的线程,没有将不会执行                        cyclicBarrier.await();                    } catch (InterruptedException e) {                        e.printStackTrace();                    } catch (BrokenBarrierException e) {                        e.printStackTrace();                    }                },String.valueOf(i)).start();            }        }    }
复制代码


  • Semaphore

  • 信号量,也就是限流的作用,比较常用


    package icu.lookyousmileface.concurrentutils;        import java.util.concurrent.Semaphore;    import java.util.concurrent.TimeUnit;        /**     * @author starrysky     * @title: SemaphoreUse     * @projectName Juc_Pro     * @description:     * @date 2021/1/292:38 下午     */    public class SemaphoreUse {        public static void main(String[] args) {            //可以用来限制流量            Semaphore semaphore = new Semaphore(3);                for (int i = 1; i <= 6; i++) {                new Thread(()->{                    try {                        //关闸                        semaphore.acquire();                        System.out.println(Thread.currentThread().getName()+"入库");                        TimeUnit.SECONDS.sleep(2);                        System.out.println(Thread.currentThread().getName()+"离开");                    } catch (InterruptedException e) {                        e.printStackTrace();                    }finally {                        //开闸                        semaphore.release();                    }                },String.valueOf(i)).start();            }        }    }
复制代码


⚠️ Tips:
**原理:**
`semaphore.acquire()` 获得,假设如果已经满了,等待,等待被释放为止!`semaphore.release();` 释放,会将当前的信号量释放 + 1,然后唤醒等待的线程!
作用: 多个共享资源互斥的使用!并发限流,控制最大的线程数!
复制代码

读写锁

  • 试验代码:


    package icu.lookyousmileface.readeandwritelock;        import java.util.HashMap;    import java.util.Map;    import java.util.concurrent.locks.ReentrantReadWriteLock;        /**     * @author starrysky     * @title: ReadAndWriteLockUse     * @projectName Juc_Pro     * @description:  读写锁     * 独占锁(写锁) 一次只能被一个线程占有     * 共享锁(读锁) 多个线程可以同时占有     * ReadWriteLock     * 读-读  可以共存!     * 读-写  不能共存!     * 写-写  不能共存!     * @date 2021/1/293:08 下午     */    public class ReadAndWriteLockUse {        public static void main(String[] args) {            MyCache myCache = new MyCache();                for (int i = 1; i <= 6; i++) {                final int  temp = i;                new Thread(()->{                    myCache.put(String.valueOf(temp),String.valueOf(temp));                },String.valueOf(i)).start();            }                for (int i = 1; i <= 6; i++) {                final int  temp = i;                new Thread(()->{                    myCache.get(String.valueOf(temp));                },String.valueOf(i)).start();            }        }    }        class MyCache{            private final Map<String,String> map = new HashMap<>(16, 0.75F);        private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();            // 存,写入的时候,只希望同时只有一个线程写        public void put(String key,String value){            reentrantReadWriteLock.writeLock().lock();            try {                System.out.println(Thread.currentThread().getName()+":正在插入"+key);                map.put(key, value);                System.out.println(Thread.currentThread().getName()+":"+key+"插入完成!");            }catch (Exception e){                e.printStackTrace();            }finally {                reentrantReadWriteLock.writeLock().unlock();            }        }        // 取,读,所有人都可以读!        public void get(String key){            reentrantReadWriteLock.readLock().lock();            try{                System.out.println(Thread.currentThread().getName()+":正在读取"+key);                String value = map.get(key);                System.out.println(Thread.currentThread().getName()+":"+key+"读取完成!"+"值:"+value);            }catch (Exception e){                e.printStackTrace();            }finally {                reentrantReadWriteLock.readLock().unlock();            }        }    }
复制代码

阻塞队列


  • 阻塞队列:


⚠️ Tips:会使用 阻塞队列的地方:多线程并发处理,线程池!

四组 API


SynchronousQueue 同步队列

  • 试验代码:


    package icu.lookyousmileface.syncqueue;        import java.util.concurrent.SynchronousQueue;    import java.util.concurrent.TimeUnit;        /**     * @author starrysky     * @title: SyncQueueUse     * @projectName Juc_Pro     * @description: 同步队列     * @date 2021/1/299:17 下午     */    public class SyncQueueUse {        public static void main(String[] args) throws InterruptedException {            SynchronousQueue<String> syncQueue = new SynchronousQueue<>();                //无须for循环会自动产生一种互喂模式            new Thread(() -> {                try {                    System.out.println(Thread.currentThread().getName() + ":" + "A入队列");                    syncQueue.put("A");                        System.out.println(Thread.currentThread().getName() + ":" + "B入队列");                    syncQueue.put("B");                        System.out.println(Thread.currentThread().getName() + ":" + "C入队列");                    syncQueue.put("C");                } catch (InterruptedException e) {                    e.printStackTrace();                }            }, "T1").start();                new Thread(() -> {                try {                    TimeUnit.SECONDS.sleep(1);                    System.out.println(Thread.currentThread().getName() + ":" + syncQueue.take() + "出队列");                    TimeUnit.SECONDS.sleep(1);                    System.out.println(Thread.currentThread().getName() + ":" + syncQueue.take() + "出队列");                    TimeUnit.SECONDS.sleep(1);                    System.out.println(Thread.currentThread().getName() + ":" + syncQueue.take() + "出队列");                } catch (InterruptedException e) {                    e.printStackTrace();                }            }, "T2").start();            }    }
复制代码


⚠️ Tips:


没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素!put、take


发布于: 刚刚阅读数: 2
用户头像

Java、消息中间件、大数据、运维 2022.04.23 加入

华为云云享专家、51CTOtop红人、CSDN博主、2021年第十届“中国软件杯”大学生软件设计大赛-B3-高并发条件下消息队列的设计与实现国赛二等奖、2021年浙江省职业院校技能大赛高职组“大数据技术与应用”赛项一等奖

评论

发布
暂无评论
Java并发JUC(java.util.concurrent)集合不安全_Java_芝士味的椒盐_InfoQ写作社区