写点什么

聊一聊 Java 中那些常见的并发控制手段 (1)

  • 2021 年 11 月 12 日
  • 本文字数:3132 字

    阅读完需:约 10 分钟

private void consumer(LinkedBlockingQueue<Integer> queue) {


try {


// 同步阻塞拿去数据


int val = queue.take();


Thread.sleep(2000);


System.out.println("成功拿到: " + val + " Thread: " + Thread.currentThread());


} catch (InterruptedException e) {


e.printStackTrace();


} finally {


// 添加数据


System.out.println("结束 " + Thread.currentThread());


queue.offer(cnt.getAndAdd(1));


}


}


@Test


public void blockQueue() throws InterruptedException {


LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);


queue.add(cnt.getAndAdd(1));


queue.add(cnt.getAndAdd(1));


new Thread(() -> consumer(queue)).start();


new Thread(() -> consumer(queue)).start();


new Thread(() -> consumer(queue)).start();


new Thread(() -> consumer(queue)).start();


Thread.sleep(10000);


}


复制代码

1.5 信号量 Semaphore

上面队列的实现方式,可以使用信号量Semaphore来完成,通过设置信号量,来控制并发数


private void semConsumer(Semaphore semaphore) {


try {


//同步阻塞,尝试获取信号


semaphore.acquire(1);


System.out.println("成功拿到信号,执行: " + Thread.currentThread());


Thread.sleep(2000);


System.out.println("执行完毕,释放信号: " + Thread.currentThread());


semaphore.release(1);


} catch (Exception e) {


e.printStackTrace();


}


}


@Test


public void semaphore() throws InterruptedException {


Semaphore semaphore = new Semaphore(2);


new Thread(() -> semConsumer(semaphore)).start();


new Thread(() -> semConsumer(semaphore)).start();


new Thread(() -> semConsumer(semaphore)).start();


new Thread(() -> semConsumer(semaphore)).start();


new Thread(() -> semConsumer(semaphore)).start();


Thread.sleep(20_000);


}


复制代码

1.6 计数器 CountDownLatch

计数,应用场景更偏向于多线程的协同,比如多个线程执行完毕之后,再处理某些事情;不同于上面的并发数的控制,它和栅栏一样,更多的是行为结果的统一


这种场景下的使用姿势一般如下


  • 重点:countDownLatch 计数为 0 时放行


@Test


public void countDown() throws InterruptedException {


CountDownLatch countDownLatch = new CountDownLatch(2);


new Thread(() -> {


try {


System.out.println("do something in " + Thread.currentThread());


Thread.sleep(2000);


} catch (InterruptedException e) {


e.printStackTrace();


} finally {


countDownLatch.countDown();


}


}).start();


new Thread(() -> {


try {


System.out.println("do something in t2: " + Thread.currentThread());


Thread.sleep(1000);


} catch (InterruptedException e) {


e.printStackTrace();


} finally {


countDownLatch.countDown();


}


}).start();


countDownLatch.await();


System.out.printf("结束");


}


复制代码

1.7 栅栏 CyclicBarrier

CyclicBarrier 的作用与上面的 CountDownLatch 相似,区别在于正向计数+1, 只有达到条件才放行; 且支持通过调用reset()重置计数,而 CountDownLatch 则不行


一个简单的 demo


private void cyclicBarrierLogic(CyclicBarrier barrier, long sleep) {


// 等待达到条件才放行


try {


System.out.println("准备执行: " + Thread.currentThread() + " at: " + LocalDateTime.now());


Thread.sleep(sleep);


int index = barrier.await();


System.out.println("开始执行: " + index + " thread: " + Thread.currentThread() + " at: " + LocalDateTime.now());


} catch (Exception e) {


e.printStackTrace();


}


}


@Test


public void testCyclicBarrier() throws InterruptedException {


// 到达两个工作线程才能继续往后面执行


CyclicBarrier barrier = new CyclicBarrier(2);


// 三秒之后,下面两个线程的才会输出 开始执行


new Thread(() -> cyclicBarrierLogic(barrier, 1000)).start();


new Thread(() -> cyclicBarrierLogic(barrier, 3000)).start();


Thread.sleep(4000);


// 重置,可以再次使用


barrier.reset();


new Thread(() -> cyclicBarrierLogic(barrier, 1)).start();


new Thread(() -> cyclicBarrierLogic(barrier, 1)).start();


Thread.sleep(10000);


}


复制代码

1.8 guava 令牌桶

guava 封装了非常简单的并发控制工具类 RateLimiter,作为单机的并发控制首选


一个控制 qps 为 2 的简单 demo 如下:


private void guavaProcess(RateLimiter rateLimiter) {


try {


// 同步阻塞方式获取


System.out.println("准备执行: " + Thread.currentThread() + " > " + LocalDateTime.now());


rateLimiter.acquire();


System.out.println("执行中: " + Thread.currentThread() + " > " + LocalDa


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


teTime.now());


} catch (Exception e) {


e.printStackTrace();


}


}


@Test


public void testGuavaRate() throws InterruptedException {


// 1s 中放行两个请求


RateLimiter rateLimiter = RateLimiter.create(2.0d);


new Thread(() -> guavaProcess(rateLimiter)).start();


new Thread(() -> guavaProcess(rateLimiter)).start();


new Thread(() -> guavaProcess(rateLimiter)).start();


new Thread(() -> guavaProcess(rateLimiter)).start();


new Thread(() -> guavaProcess(rateLimiter)).start();


new Thread(() -> guavaProcess(rateLimiter)).start();


new Thread(() -> guavaProcess(rateLimiter)).start();


Thread.sleep(20_000);


}


复制代码


输出:


准备执行: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.263


准备执行: Thread[Thread-1,5,main] > 2021-04-13T10:18:05.263


准备执行: Thread[Thread-5,5,main] > 2021-04-13T10:18:05.264


准备执行: Thread[Thread-7,5,main] > 2021-04-13T10:18:05.264


准备执行: Thread[Thread-3,5,main] > 2021-04-13T10:18:05.263


准备执行: Thread[Thread-4,5,main] > 2021-04-13T10:18:05.264


准备执行: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.263


执行中: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.267


执行中: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.722


执行中: Thread[Thread-4,5,main] > 2021-04-13T10:18:06.225


执行中: Thread[Thread-3,5,main] > 2021-04-13T10:18:06.721


执行中: Thread[Thread-7,5,main] > 2021-04-13T10:18:07.221


执行中: Thread[Thread-5,5,main] > 2021-04-13T10:18:07.720


执行中: Thread[Thread-1,5,main] > 2021-04-13T10:18:08.219


复制代码

1.9 滑动窗口 TimeWindow

没有找到通用的滑动窗口 jar 包,一般来讲滑动窗口更适用于平滑的限流,解决瞬时高峰问题


一个供参考的实现方式:


固定大小队列,队列中每个数据代表一个时间段的计数,


访问 -》 队列头拿数据(注意不出队)-》判断是否跨时间段 -》 同一时间段,计数+1 -》跨时间段,新增数据入队,若扔不进去,表示时间窗满,队尾数据出队


问题:当流量稀疏时,导致不会自动释放过期的数据 解决方案:根据时间段设置定时任务,模拟访问操作,只是将计数改为 + 0

1.10 小结

本文给出了几种单机版的并发控制的技术手段,主要目的是介绍了一些可选的方案,技术细节待后续补全完善,当然如果有其他的建议,欢迎评论交流


II. 其他



1. 一灰灰Blogblog.hhui.top

一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

2. 声明

尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现 bug 或者有更好的建议,欢迎批评指正,不吝感激作者:一灰灰


链接:https://juejin.cn/post/6995710761787981831


来源:掘金


著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

评论

发布
暂无评论
聊一聊Java中那些常见的并发控制手段(1)