写点什么

聊一聊 Java 中那些常见的并发控制手段

  • 2021 年 11 月 12 日
  • 本文字数:3070 字

    阅读完需:约 10 分钟

System.out.println("执行完毕! " + Thread.currentThread().getName());


writeLock.unlock();


} catch (Exception e) {


e.printStackTrace();


}


}


@Test


public void lock() throws InterruptedException {


ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();


new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();


new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();


new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();


Thread.sleep(20000);


}


复制代码

1.4 阻塞队列

借助同步阻塞队列,也可以实现并发控制的效果,比如队列中初始化 n 个元素,每次消费从队列中获取一个元素,如果拿不到则阻塞;执行完毕之后,重新塞入一个元素,这样就可以实现一个简单版的并发控制


demo 版演示,下面指定队列长度为 2,表示最大并发数控制为 2;设置为 1 时,可以实现单线程的访问控制


AtomicInteger cnt = new AtomicInteger();


private void consumer(LinkedBlockingQueue<Integer> queue) {


try {


// 同步阻塞拿去数据


int val = queue.take();


Thread.sleep(2000);


System.out.println("成功拿到: " + val + " Thread: " + Thread.currentThread());


} catch (InterruptedException e) {


e.printStackTrace();


} finally {


// 添加数据


System.out.println("结束 " + Thread.currentThread());


queue.offer(cnt.getAndAdd(1));


}


}


@Test


public void blockQueue() throws InterruptedException {


LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);


queue.add(cnt.getAndAdd(1));


queue.add(cnt.getAndAdd(1));


new Thread(() -> consumer(queue)).start();


new Thread(() -> consumer(queue)).start();


new Thread(() -> consumer(queue)).start();


new Thread(() -> consumer(queue)).start();


Thread.sleep(10000);


}


复制代码

1.5 信号量 Semaphore

上面队列的实现方式,可以使用信号量Semaphore来完成,通过设置信号量,来控制并发数


private void semConsumer(Semaphore semaphore) {


try {


//同步阻塞,尝试获取信号


semaphore.acquire(1);


System.out.println("成功拿到信号,执行: " + Thread.currentThread());


Thread.sleep(2000);


System.out.println("执行完毕,释放信号: " + Thread.currentThread());


semaphore.release(1);


} catch (Exception e) {


e.printStackTrace();


}


}


@Test


public void semaphore() throws InterruptedException {


Semaphore semaphore = new Semaphore(2);


new Thread(() -> semConsumer(semaphore)).start();


new Thread(() -> semConsumer(semaphore)).start();


new Thread(() -> semConsumer(semaphore)).start();


new Thread(() -> semConsumer(semaphore)).start();


new Thread(() -> semConsumer(semaphore)).start();


Thread.sleep(20_000);


}


复制代码

1.6 计数器 CountDownLatch

计数,应用场景更偏向于多线程的协同,比如多个线程执行完毕之后,再处理某些事情;不同于上面的并发数的控制,它和栅栏一样,更多的是行为结果的统一


这种场景下的使用姿势一般如下


  • 重点:countDownLatch 计数为 0 时放行


@Test


public void countDown() throws InterruptedException {


CountDownLatch countDownLatch = new CountDownLatch(2);


new Thread(() -> {


try {


System.out.println("do something in " + Thread.currentThread());


Thread.sleep(2000);


} catch (InterruptedException e) {


e.printStackTrace();


} finally {


countDownLatch.countDown();


}


}).start();


new Thread(() -> {


try {


System.out.println("do something in t2: " + Thread.currentThread());


Thread.sleep(1000);


} catch (InterruptedException e) {


e.printStackTrace();


} finally {


countDownLatch.countDown();


}


}).start();


countDownLatch.await();


System.out.printf("结束");


}


复制代码

1.7 栅栏 CyclicBarrier

CyclicBarrier 的作用与上面的 CountDownLatch 相似,区别在于正向计数+1, 只有达到条件才放行; 且支持通过调用reset()重置计数,而 CountDownLatch 则不行


一个简单的 demo


private void cyclicBarrierLogic(CyclicBarrier barrier, long sleep) {


// 等待达到条件才放行


try {


System.out.println("准备执行: " + Thread.currentThread() + " at: " + LocalDateTime.now());


Thread.sleep(sleep);


int index = barrier.await();


System.out.println("开始执行: " + index + " thread: " + Thread.currentThread() + " at: " + LocalDateTime.now());


} catch (Exception e) {


e.printStackTrace();


}


}


@Test


public void testCyclicBarrier() throws InterruptedException {


// 到达两个工作线程才能继续往后面执行


CyclicBarrier barrier = new CyclicBarrier(2);


// 三秒之后,下面两个线程的才会输出 开始执行


new Thread(() -> cyclicBarrierLogic(barrier, 1000)).start();


new Thread(() -> cyclicBarrierLogic(barrier, 3000)).start();


Thread.sleep(4000);


// 重置,可以再次使用


barrier.reset();


new Thread(() -> cyclicBarrierLogic(barrier, 1)).start();


new Thread(() -> cyclicBarrierLogic(barrier, 1)).start();


Thread.sleep(10000);


}


复制代码

1.8 guava 令牌桶

guava 封装了非常简单的并发控制工具类 RateLimiter,作为单机的并发控制首选


一个控制 qps 为 2 的简单 demo 如下:


private void guavaProcess(RateLimiter rateLimiter) {


try {


// 同步阻塞方式获取


System.out.println("准备执行: " + Thread.currentThread() + " > " + LocalDateTime.now());


rateLimiter.acquire();


System.out.println("执行中: " + Thread.currentThread() + " > " + LocalDateTime.now());


} catch (Exception e) {


e.printStackTrace();


}


}


@Test


public void testGuavaRate() throws InterruptedException {


// 1s 中放行两个请求


RateLimiter rateLimiter = RateLimiter.create(2


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


.0d);


new Thread(() -> guavaProcess(rateLimiter)).start();


new Thread(() -> guavaProcess(rateLimiter)).start();


new Thread(() -> guavaProcess(rateLimiter)).start();


new Thread(() -> guavaProcess(rateLimiter)).start();


new Thread(() -> guavaProcess(rateLimiter)).start();


new Thread(() -> guavaProcess(rateLimiter)).start();


new Thread(() -> guavaProcess(rateLimiter)).start();


Thread.sleep(20_000);


}


复制代码


输出:


准备执行: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.263


准备执行: Thread[Thread-1,5,main] > 2021-04-13T10:18:05.263


准备执行: Thread[Thread-5,5,main] > 2021-04-13T10:18:05.264


准备执行: Thread[Thread-7,5,main] > 2021-04-13T10:18:05.264


准备执行: Thread[Thread-3,5,main] > 2021-04-13T10:18:05.263


准备执行: Thread[Thread-4,5,main] > 2021-04-13T10:18:05.264


准备执行: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.263


执行中: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.267


执行中: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.722


执行中: Thread[Thread-4,5,main] > 2021-04-13T10:18:06.225


执行中: Thread[Thread-3,5,main] > 2021-04-13T10:18:06.721


执行中: Thread[Thread-7,5,main] > 2021-04-13T10:18:07.221


执行中: Thread[Thread-5,5,main] > 2021-04-13T10:18:07.720


执行中: Thread[Thread-1,5,main] > 2021-04-13T10:18:08.219


复制代码

评论

发布
暂无评论
聊一聊Java中那些常见的并发控制手段