聊一聊 Java 中那些常见的并发控制手段
System.out.println("执行完毕! " + Thread.currentThread().getName());
writeLock.unlock();
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void lock() throws InterruptedException {
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();
new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();
new Thread(()->doSome(reentrantReadWriteLock.writeLock())).start();
Thread.sleep(20000);
}
复制代码
1.4 阻塞队列
借助同步阻塞队列,也可以实现并发控制的效果,比如队列中初始化 n 个元素,每次消费从队列中获取一个元素,如果拿不到则阻塞;执行完毕之后,重新塞入一个元素,这样就可以实现一个简单版的并发控制
demo 版演示,下面指定队列长度为 2,表示最大并发数控制为 2;设置为 1 时,可以实现单线程的访问控制
AtomicInteger cnt = new AtomicInteger();
private void consumer(LinkedBlockingQueue<Integer> queue) {
try {
// 同步阻塞拿去数据
int val = queue.take();
Thread.sleep(2000);
System.out.println("成功拿到: " + val + " Thread: " + Thread.currentThread());
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 添加数据
System.out.println("结束 " + Thread.currentThread());
queue.offer(cnt.getAndAdd(1));
}
}
@Test
public void blockQueue() throws InterruptedException {
LinkedBlockingQueue<Integer> queue = new LinkedBlockingQueue<>(2);
queue.add(cnt.getAndAdd(1));
queue.add(cnt.getAndAdd(1));
new Thread(() -> consumer(queue)).start();
new Thread(() -> consumer(queue)).start();
new Thread(() -> consumer(queue)).start();
new Thread(() -> consumer(queue)).start();
Thread.sleep(10000);
}
复制代码
1.5 信号量 Semaphore
上面队列的实现方式,可以使用信号量Semaphore
来完成,通过设置信号量,来控制并发数
private void semConsumer(Semaphore semaphore) {
try {
//同步阻塞,尝试获取信号
semaphore.acquire(1);
System.out.println("成功拿到信号,执行: " + Thread.currentThread());
Thread.sleep(2000);
System.out.println("执行完毕,释放信号: " + Thread.currentThread());
semaphore.release(1);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void semaphore() throws InterruptedException {
Semaphore semaphore = new Semaphore(2);
new Thread(() -> semConsumer(semaphore)).start();
new Thread(() -> semConsumer(semaphore)).start();
new Thread(() -> semConsumer(semaphore)).start();
new Thread(() -> semConsumer(semaphore)).start();
new Thread(() -> semConsumer(semaphore)).start();
Thread.sleep(20_000);
}
复制代码
1.6 计数器 CountDownLatch
计数,应用场景更偏向于多线程的协同,比如多个线程执行完毕之后,再处理某些事情;不同于上面的并发数的控制,它和栅栏一样,更多的是行为结果的统一
这种场景下的使用姿势一般如下
重点:countDownLatch 计数为 0 时放行
@Test
public void countDown() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(2);
new Thread(() -> {
try {
System.out.println("do something in " + Thread.currentThread());
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}).start();
new Thread(() -> {
try {
System.out.println("do something in t2: " + Thread.currentThread());
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
countDownLatch.countDown();
}
}).start();
countDownLatch.await();
System.out.printf("结束");
}
复制代码
1.7 栅栏 CyclicBarrier
CyclicBarrier 的作用与上面的 CountDownLatch 相似,区别在于正向计数+1, 只有达到条件才放行; 且支持通过调用reset()
重置计数,而 CountDownLatch 则不行
一个简单的 demo
private void cyclicBarrierLogic(CyclicBarrier barrier, long sleep) {
// 等待达到条件才放行
try {
System.out.println("准备执行: " + Thread.currentThread() + " at: " + LocalDateTime.now());
Thread.sleep(sleep);
int index = barrier.await();
System.out.println("开始执行: " + index + " thread: " + Thread.currentThread() + " at: " + LocalDateTime.now());
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testCyclicBarrier() throws InterruptedException {
// 到达两个工作线程才能继续往后面执行
CyclicBarrier barrier = new CyclicBarrier(2);
// 三秒之后,下面两个线程的才会输出 开始执行
new Thread(() -> cyclicBarrierLogic(barrier, 1000)).start();
new Thread(() -> cyclicBarrierLogic(barrier, 3000)).start();
Thread.sleep(4000);
// 重置,可以再次使用
barrier.reset();
new Thread(() -> cyclicBarrierLogic(barrier, 1)).start();
new Thread(() -> cyclicBarrierLogic(barrier, 1)).start();
Thread.sleep(10000);
}
复制代码
1.8 guava 令牌桶
guava 封装了非常简单的并发控制工具类 RateLimiter,作为单机的并发控制首选
一个控制 qps 为 2 的简单 demo 如下:
private void guavaProcess(RateLimiter rateLimiter) {
try {
// 同步阻塞方式获取
System.out.println("准备执行: " + Thread.currentThread() + " > " + LocalDateTime.now());
rateLimiter.acquire();
System.out.println("执行中: " + Thread.currentThread() + " > " + LocalDateTime.now());
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testGuavaRate() throws InterruptedException {
// 1s 中放行两个请求
RateLimiter rateLimiter = RateLimiter.create(2
.0d);
new Thread(() -> guavaProcess(rateLimiter)).start();
new Thread(() -> guavaProcess(rateLimiter)).start();
new Thread(() -> guavaProcess(rateLimiter)).start();
new Thread(() -> guavaProcess(rateLimiter)).start();
new Thread(() -> guavaProcess(rateLimiter)).start();
new Thread(() -> guavaProcess(rateLimiter)).start();
new Thread(() -> guavaProcess(rateLimiter)).start();
Thread.sleep(20_000);
}
复制代码
输出:
准备执行: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.263
准备执行: Thread[Thread-1,5,main] > 2021-04-13T10:18:05.263
准备执行: Thread[Thread-5,5,main] > 2021-04-13T10:18:05.264
准备执行: Thread[Thread-7,5,main] > 2021-04-13T10:18:05.264
准备执行: Thread[Thread-3,5,main] > 2021-04-13T10:18:05.263
准备执行: Thread[Thread-4,5,main] > 2021-04-13T10:18:05.264
准备执行: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.263
执行中: Thread[Thread-2,5,main] > 2021-04-13T10:18:05.267
执行中: Thread[Thread-6,5,main] > 2021-04-13T10:18:05.722
执行中: Thread[Thread-4,5,main] > 2021-04-13T10:18:06.225
执行中: Thread[Thread-3,5,main] > 2021-04-13T10:18:06.721
执行中: Thread[Thread-7,5,main] > 2021-04-13T10:18:07.221
执行中: Thread[Thread-5,5,main] > 2021-04-13T10:18:07.720
执行中: Thread[Thread-1,5,main] > 2021-04-13T10:18:08.219
复制代码
评论