在 Java 并发编程中,锁是确保线程安全、协调多线程访问共享资源的关键机制。从基本的 synchronized
同步关键字到高级的 ReentrantLock
、读写锁 ReadWriteLock
、无锁设计如 AtomicInteger
,再到复杂的同步辅助工具如 CountDownLatch
、CyclicBarrier
和 Semaphore
,每种锁都针对特定的并发场景设计,以解决多线程环境下的同步问题。StampedLock
提供了乐观读锁和悲观写锁的选项,而 ConcurrentHashMap
和 ConcurrentLinkedQueue
等并发集合则通过内部机制优化了并发访问。了解不同锁的特点和适用场景,对于构建高效、稳定的并发应用程序至关重要。
肖哥弹架构 跟大家“弹弹” 高并发锁, 关注公号回复 'mvcc' 获得手写数据库事务代码
欢迎 点赞,关注,评论。
关注公号 Solomon 肖哥弹架构获取更多精彩内容
历史热点文章
1、锁选择维度
选择适合的锁通常依赖于特定的应用场景和并发需求。以下是一个表格,概述了不同锁类型的关键特性和选择它们的考量维度:
2、锁详细分析
2.7. CyclicBarrier
CyclicBarrier
是 Java 中用于线程间同步的一种工具,它允许一组线程互相等待,直到所有线程都到达一个公共屏障点。
图解说明:
Java 线程:表示运行中的线程,它们可能需要在某个点同步。
CyclicBarrier 实例:是 CyclicBarrier
类的实例,用于协调一组线程在屏障点同步。
屏障:表示线程需要到达的同步点,所有线程必须到达这个点才能继续执行。
共享资源或任务:表示线程需要访问的共享资源或执行的任务,它们在屏障点同步后可以安全地执行。
等待区:表示等待其他线程到达屏障点的线程集合。
计数器:CyclicBarrier
内部维护一个计数器,用于跟踪尚未到达屏障点的线程数量。
屏障动作(Runnable) :可选的,当所有线程到达屏障点时,可以执行一个特定的动作或任务。
综合说明:
作用:CyclicBarrier
是一种同步帮助工具,允许一组线程相互等待,直到所有线程都到达某个公共屏障点。
背景:在需要多个线程协作完成任务时,CyclicBarrier
提供了一种机制,使得所有线程可以在屏障点同步,然后继续执行。
优点:
可重复使用:与 CountDownLatch
不同,CyclicBarrier
可以重复使用,适用于周期性的任务同步。
支持屏障动作:可以设置一个在所有线程到达屏障点后执行的回调。
缺点:
可能导致死锁:如果一个或多个线程未到达屏障点,其他线程将一直等待。
复杂性:需要合理设计以避免线程永久等待。
场景:适用于需要周期性同步多个线程的场景。
业务举例:在多阶段数据处理流程中,每个阶段需要所有数据都准备好后才能开始处理。使用CyclicBarrier
可以确保所有数据加载线程在每个阶段开始前都已准备好。
使用方式:
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Race {
private final CyclicBarrier barrier;
public Race(int numberOfRunners) {
barrier = new CyclicBarrier(numberOfRunners, () -> {
System.out.println("比赛开始!");
// 这里可以放置所有参与者到达屏障后要执行的操作
});
}
public void run() {
System.out.println("等待其他参赛者...");
try {
barrier.await(); // 等待其他线程
System.out.println("开始跑步!");
// 跑步时间
Thread.sleep((long) (Math.random() * 10000));
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
final int numberOfRunners = 5;
Race race = new Race(numberOfRunners);
// 创建参赛者线程
for (int i = 0; i < numberOfRunners; i++) {
final int runnerNumber = i + 1;
new Thread(() -> {
System.out.println("参赛者 " + runnerNumber + " 已准备就绪");
race.run();
}).start();
}
}
}
复制代码
业务代码案例:
业务说明: 在大数据处理系统中,经常需要对大量数据进行多阶段处理,例如,数据清洗、转换、聚合和加载。这些处理阶段通常需要按顺序执行,且每个阶段开始前必须确保所有数据都已准备好。
为什么需要 CyclicBarrier
技术: 在多阶段数据处理的场景中,不同的处理任务可能由不同的线程执行,而这些线程的执行时间可能不同。CyclicBarrier
允许每个阶段的处理在开始前等待所有相关线程完成上一阶段的任务,确保数据的一致性和完整性。
没有 CyclicBarrier
技术会带来什么后果:
没有使用 CyclicBarrier
或其他同步协调机制可能会导致以下问题:
数据不一致:如果后续阶段的处理在前一阶段的数据未完全准备好时开始,可能会导致处理结果不准确。
资源浪费:在等待数据准备的过程中,系统资源可能被无效占用,导致资源利用效率低下。
错误和异常:由于阶段间的依赖关系没有得到妥善处理,可能会引发程序错误或运行时异常。
代码实现:
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class DataProcessingPipeline {
private final ExecutorService executor = Executors.newFixedThreadPool(4);
private final CyclicBarrier barrier;
private final int numberOfPhases;
private final int numberOfTasks;
public DataProcessingPipeline(int numberOfTasks, int numberOfPhases) {
this.numberOfTasks = numberOfTasks;
this.numberOfPhases = numberOfPhases;
this.barrier = new CyclicBarrier(numberOfTasks, () -> {
System.out.println("一个阶段完成,准备进入下一阶段");
});
}
public void processData() throws Exception {
for (int phase = 1; phase <= numberOfPhases; phase++) {
System.out.println("阶段 " + phase + " 开始");
for (int task = 0; task < numberOfTasks; task++) {
final int currentTask = task;
executor.submit(() -> {
try {
// 数据处理任务
System.out.println("任务 " + currentTask + " 在阶段 " + phase + " 执行");
Thread.sleep((long) (Math.random() * 5000));
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
barrier.await(); // 等待所有任务完成
}
executor.shutdown();
}
public static void main(String[] args) {
DataProcessingPipeline pipeline = new DataProcessingPipeline(4, 3);
try {
pipeline.processData();
} catch (Exception e) {
e.printStackTrace();
}
}
}
复制代码
2.8. Atomic Variables
原子变量是 Java 中 java.util.concurrent.atomic
包提供的一些类,它们利用底层硬件的原子性指令来保证操作的原子性,无需使用锁。
图解说明:
Java 线程:表示运行中的线程,它们可能需要对共享资源进行原子操作。
Atomic Variables:表示原子变量的集合,包括 AtomicInteger
、AtomicLong
、AtomicReference
等。
AtomicInteger、AtomicLong、AtomicReference:分别表示整型、长整型和引用类型的原子变量。
硬件支持的原子指令:底层硬件提供的原子性指令,如 compare-and-swap (CAS)、load-linked、store-conditional 等。
共享资源:表示被多个线程共享的数据,如计数器、累加器等。
内存:表示 Java 程序使用的内存空间,包括堆和栈等。
变量状态:表示原子变量在内存中的当前状态。
综合说明:
作用:原子变量类(如 AtomicInteger
, AtomicLong
, AtomicReference
等)提供了一种机制,使得对变量的某些操作(如自增、自减、读取和写入)是原子性的,无需使用传统的锁。
背景:在多线程环境中,对共享变量的并发访问需要同步措施以防止数据竞争。原子变量利用底层硬件的原子指令来保证操作的原子性,从而简化了线程同步。
优点:
无锁设计:避免使用传统锁,减少了线程切换的开销。
性能优化:对于高竞争的简单变量访问,原子变量通常比锁有更好的性能。
缺点:
功能限制:仅适用于简单的操作,复杂的操作无法通过原子变量实现。
可组合性问题:复杂的原子操作需要仔细设计,否则可能引入竞态条件。
场景:适用于对简单变量进行原子操作的场景,如计数器、累加器等。
业务举例:在电商平台的库存管理中,AtomicInteger
可以用来原子地更新商品的库存数量,确保在高并发环境下库存数据的一致性。
使用方式:
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
public class Counter {
// 使用 AtomicInteger 来确保计数器的线程安全
private final AtomicInteger count = new AtomicInteger(0);
// 提供一个方法来增加计数器的值
public void increment() {
// 原子地增加计数器的值
count.incrementAndGet();
}
// 提供一个方法来获取当前计数器的值
public int getCount() {
// 原子地获取计数器的值
return count.get();
}
}
public class DataStore {
// 使用 AtomicLong 来统计数据总量
private final AtomicLong dataCount = new AtomicLong(0);
public void addData(long size) {
// 原子地将数据大小累加到总量
dataCount.addAndGet(size);
}
public long getDataCount() {
// 原子地获取当前数据总量
return dataCount.get();
}
}
// 测试类
public class AtomicVariablesDemo {
public static void main(String[] args) {
Counter counter = new Counter();
DataStore dataStore = new DataStore();
// 多线程环境中对计数器和数据总量的更新
for (int i = 0; i < 10; i++) {
new Thread(() -> {
counter.increment();
dataStore.addData(100); // 假设每次操作增加100单位数据
}).start();
}
// 等待所有线程完成
while (Thread.activeCount() > 1) {
Thread.yield();
}
// 输出计数器的值和数据总量
System.out.println("Counter value: " + counter.getCount());
System.out.println("Data store size: " + dataStore.getDataCount());
}
}
复制代码
业务代码案例:
场景描述:社交网络的实时消息计数
业务说明: 社交网络平台需要显示每个用户的实时消息通知数。每当用户收到新消息时,消息计数需要增加;用户阅读消息时,计数可能会减少或被重置。此计数需要对所有用户可见,且在高并发环境下保持准确。
为什么需要 Atomic Variables
技术: 在社交网络中,多个用户可能同时发送消息给同一个接收者,或者一个用户可能同时在多个设备上接收消息。这导致对消息计数的读取和更新操作非常频繁。使用 AtomicInteger
可以确保消息计数更新的原子性,并且在多线程环境下保持数据的一致性。
没有 Atomic Variables
技术会带来什么后果:
没有使用 Atomic Variables
或其他并发控制机制可能会导致以下问题:
数据不一致:消息计数可能会出错,导致用户看到不正确的消息数量。
用户体验下降:如果消息通知不准确,用户可能会错过重要通知,或者对应用的可靠性产生怀疑。
系统复杂度增加:在没有有效同步机制的情况下,维护数据一致性将变得复杂且容易出错。
代码实现:
import java.util.concurrent.atomic.AtomicInteger;
public class MessageNotificationCounter {
private final AtomicInteger messageCount = new AtomicInteger(0);
// 接收新消息时调用此方法
public void receiveMessage() {
// 原子地增加消息计数
messageCount.incrementAndGet();
System.out.println("New message received. Total messages: " + messageCount.get());
}
// 用户阅读消息时调用此方法
public void messagesRead() {
// 原子地减少消息计数
messageCount.decrementAndGet();
System.out.println("Messages read. Remaining messages: " + messageCount.get());
}
// 获取当前消息计数
public int getMessageCount() {
return messageCount.get();
}
}
// 测试类
public class AtomicVariablesDemo {
public static void main(String[] args) {
MessageNotificationCounter counter = new MessageNotificationCounter();
// 多个用户同时发送消息
Thread sender1 = new Thread(() -> {
counter.receiveMessage();
});
Thread sender2 = new Thread(() -> {
counter.receiveMessage();
});
// 用户阅读消息
Thread reader = new Thread(() -> {
counter.messagesRead();
});
sender1.start();
sender2.start();
reader.start();
sender1.join();
sender2.join();
reader.join();
System.out.println("Final message count: " + counter.getMessageCount());
}
}
复制代码
2.9. ConcurrentHashMap
ConcurrentHashMap
是 Java 中一个线程安全的哈希表,它通过分段锁(Segmentation)和 CAS 操作来支持高并发的读写操作。
图解说明:
Java 线程:表示运行中的线程,它们可能需要对 ConcurrentHashMap
进行读写操作。
ConcurrentHashMap 实例:是 ConcurrentHashMap
类的实例,用于存储键值对并提供线程安全的访问。
Segment 数组:ConcurrentHashMap
将哈希表分为多个段(Segment),每个段维护一部分哈希桶,通过分段锁减少锁的竞争。
Hash 桶:存储哈希桶数组,每个桶可以包含一个或多个键值对。
链表或红黑树:在哈希桶中,键值对最初以链表形式存储,当链表长度超过阈值时,链表可能会被转换为红黑树以提高搜索效率。
共享资源:表示存储在 ConcurrentHashMap
中的键值对数据。
读操作:线程可以并发地读取 ConcurrentHashMap
中的数据,在读多写少的场景下,读操作不会阻塞其他读操作。
写操作:线程对 ConcurrentHashMap
的写入操作,写操作需要获取相应段的锁。
锁:每个段拥有自己的锁,写操作需要获取锁,而读操作通常不需要。
升级设计说明:
Java 1.7 ConcurrentHashMap 锁机制
在 Java 1.7 中,ConcurrentHashMap
使用分段锁机制,其中每个段相当于一个小的哈希表,拥有自己的锁。
Java 1.8 ConcurrentHashMap 锁机制
在 Java 1.8 中,ConcurrentHashMap
摒弃了分段锁机制,采用了 CAS 和 synchronized
来确保线程安全。
综合说明:
作用:ConcurrentHashMap
是 Java 中提供的一个线程安全的哈希表,它通过分段锁的概念来允许并发的读写操作,从而提高并发访问的性能。
背景:传统的 HashMap
在多线程环境下需要外部同步,而 ConcurrentHashMap
通过锁分离技术减少了锁的竞争,提供了更好的并发性能。
优点:
高并发:通过细分锁到段,允许多个线程同时操作不同段的数据。
动态扩容:内部采用动态数组和链表结构,提高了空间和时间效率。
缺点:
复杂度高:实现复杂,需要维护多个锁和复杂的数据结构。
性能调优:在极端高并发场景下,可能需要调整默认的并发级别。
场景:适用于需要高并发访问的缓存或数据存储。
业务举例:在大数据处理系统中,ConcurrentHashMap
可以用来存储实时计算结果,支持大量并发的读写操作而不会导致性能瓶颈。
使用方式:
import java.util.concurrent.ConcurrentHashMap;
import java.util.Map;
public class ConcurrentHashMapDemo {
// 创建一个 ConcurrentHashMap 实例
private final ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
// 将一个键值对插入到 Map 中
public void put(String key, Integer value) {
// put 方法是线程安全的
map.put(key, value);
}
// 从 Map 中获取与指定键关联的值
public Integer get(String key) {
// get 方法是线程安全的
return map.get(key);
}
// 计算 Map 中的元素数量
public int size() {
// size 方法是线程安全的
return map.size();
}
// 演示删除操作
public void remove(String key) {
// remove 方法是线程安全的
map.remove(key);
}
// 演示如何批量添加数据
public void addAll(Map<String, Integer> newData) {
// putAll 方法是线程安全的
map.putAll(newData);
}
public static void main(String[] args) {
ConcurrentHashMapDemo demo = new ConcurrentHashMapDemo();
// 批量添加数据
demo.addAll(Map.of("key1", 1, "key2", 2, "key3", 3));
// 单独添加一条数据
demo.put("key4", 4);
// 获取并打印一条数据
System.out.println("Value for 'key1': " + demo.get("key1"));
// 获取 Map 的大小
System.out.println("Map size: " + demo.size());
// 删除一条数据
demo.remove("key2");
// 再次获取 Map 的大小
System.out.println("Map size after removal: " + demo.size());
}
}
复制代码
业务代码案例:
业务说明: 在分布式缓存系统中,经常需要存储和检索用户会话信息、应用配置、热点数据等。这些数据需要被多个应用实例共享,并且要求在高并发环境下依然保持高性能。缓存数据通常有过期时间,需要定期清理。
为什么需要 ConcurrentHashMap
技术: ConcurrentHashMap
提供了一种高效的方式来处理并发的读取和更新操作,并且它的分段锁机制允许多个线程同时对不同段进行操作,从而提高并发处理能力。此外,ConcurrentHashMap
在 Java 8 中引入的红黑树结构使得即使在高并发更新导致哈希冲突时,也能保持高效的性能。
没有 ConcurrentHashMap
技术会带来什么后果:
没有使用 ConcurrentHashMap
可能会导致以下问题:
性能瓶颈:在高并发环境下,如果使用 HashMap
加 synchronized
,可能导致严重的性能瓶颈,因为所有线程必须等待一个锁。
数据不一致:在没有适当同步的情况下,多个线程同时更新数据可能导致缓存数据不一致。
扩展性差:随着系统负载的增加,基于 HashMap
的缓存解决方案可能难以扩展,因为锁竞争和线程安全问题。
代码实现:
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
public class DistributedCache<K, V> {
private final ConcurrentHashMap<K, V> cacheMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<K, Long> expirationMap = new ConcurrentHashMap<>();
public void put(K key, V value, long ttl) {
cacheMap.put(key, value);
expirationMap.put(key, System.currentTimeMillis() + ttl);
scheduleEviction(key, ttl);
}
public V get(K key) {
Long expirationTime = expirationMap.get(key);
if (expirationTime == null || expirationTime < System.currentTimeMillis()) {
cacheMap.remove(key);
expirationMap.remove(key);
return null;
}
return cacheMap.get(key);
}
private void scheduleEviction(final K key, final long ttl) {
new Thread(() -> {
try {
TimeUnit.MILLISECONDS.sleep(ttl);
cacheMap.computeIfPresent(key, (k, v) -> null);
expirationMap.remove(key);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
public static void main(String[] args) {
DistributedCache<String, String> cache = new DistributedCache<>();
cache.put("userSession", "sessionData", 5000); // 缓存设置5秒过期
// 多个线程并发访问缓存
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
String result = cache.get("userSession");
System.out.println("Thread " + finalI + " retrieved: " + result);
}).start();
}
}
}
复制代码
2.10.ConcurrentSkipListMap
ConcurrentSkipListMap
是 Java 中实现的一个高性能并发的有序映射,它使用跳表(Skip List)作为其底层数据结构。
图解说明:
Java 线程:表示运行中的线程,它们可能需要对 ConcurrentSkipListMap
进行读写操作。
ConcurrentSkipListMap 实例:是 ConcurrentSkipListMap
类的实例,用于存储键值对并提供线程安全的访问。
Skip List 层级结构:跳表由多层索引构成,每一层都是一个有序的链表。
索引层:跳表中的索引层,用于加速搜索操作。
数据层:跳表中的底层数据结构,存储实际的键值对。
Node 节点:跳表中的节点,包含键值对和指向其他节点的链接。
共享资源:表示存储在 ConcurrentSkipListMap
中的键值对数据。
读操作:线程可以并发地读取 ConcurrentSkipListMap
中的数据。
写操作:线程可以并发地修改 ConcurrentSkipListMap
中的数据。
CAS 操作:在更新节点链接或修改数据时,使用 CAS 操作来保证线程安全。
自旋锁/同步块:在某些情况下,如果 CAS 操作失败,可能会使用自旋锁或同步块来确保操作的原子性。
操作流程:
读操作:
线程通过索引层快速定位到数据层的节点。
线程使用 volatile
读取节点的值,确保内存可见性。
写操作:
线程在更新或添加节点时,首先尝试使用 CAS 操作。
如果 CAS 操作失败,线程可能会使用自旋锁或同步块来确保原子性。
综合说明:
作用:ConcurrentSkipListMap
是一种线程安全的有序映射,它通过使用跳表(Skip List)数据结构来支持高效的并发访问和排序操作。背景:在需要高效并发访问和保持元素有序的场景中,传统的 TreeMap
由于其加锁策略在高并发环境下性能受限,ConcurrentSkipListMap
提供了一种替代方案。优点:
高性能并发访问:通过跳表结构和细粒度锁定,实现了高效的并发读取和更新。
有序性:保持元素的有序性,支持范围查询等操作。
动态调整:可以根据访问模式动态调整结构,优化性能。缺点:
内存占用:相比无序的 ConcurrentHashMap
,由于维护了有序性,内存占用可能更高。
复杂性:实现相对复杂,涉及多级索引和节点的管理。场景:适用于需要有序数据且高并发访问的场景,如实时数据索引、范围查询等。业务举例:在一个金融市场分析系统中,需要维护一个实时更新的价格索引,ConcurrentSkipListMap
可以用来存储和快速检索各种金融工具的当前价格。
使用方式:
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
public class ConcurrentSkipListMapDemo {
// 创建一个 ConcurrentSkipListMap 实例
private final ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>();
// 将一个键值对插入到 Map 中
public void put(Integer key, String value) {
// put 方法是线程安全的
map.put(key, value);
}
// 从 Map 中获取与指定键关联的值
public String get(Integer key) {
// get 方法是线程安全的
return map.get(key);
}
// 获取 Map 的键集合
public java.util.NavigableSet<Integer> keySet() {
// keySet 方法返回 Map 的键集合视图
return map.keySet();
}
// 获取 Map 的值集合
public java.util.Collection<String> values() {
// values 方法返回 Map 的值集合视图
return map.values();
}
// 获取 Map 的大小
public int size() {
// size 方法是线程安全的
return map.size();
}
// 演示删除操作
public void remove(Integer key) {
// remove 方法是线程安全的
map.remove(key);
}
public static void main(String[] args) {
ConcurrentSkipListMapDemo demo = new ConcurrentSkipListMapDemo();
// 插入一些数据
demo.put(1, "One");
demo.put(2, "Two");
demo.put(3, "Three");
// 获取并打印一条数据
System.out.println("Value for key 2: " + demo.get(2));
// 获取 Map 的大小
System.out.println("Map size: " + demo.size());
// 获取并打印所有键
System.out.println("All keys: " + demo.keySet());
// 删除一条数据
demo.remove(2);
// 再次获取 Map 的大小
System.out.println("Map size after removal: " + demo.size());
// 获取并打印所有值
System.out.println("All values: " + demo.values());
}
}
复制代码
业务代码案例:
业务说明: 实时股票交易系统需要维护一个动态变化的股票价格索引,该索引需要根据实时的市场数据进行更新,并且允许多个交易线程并发地读取和更新股票价格。此外,系统还需要定期根据价格波动进行调整,如计算价格的平均值、执行价格范围查询等。
为什么需要 ConcurrentSkipListMap
技术: ConcurrentSkipListMap
是一个线程安全的有序映射,它允许高效的范围查询和有序访问,这对于股票价格索引来说至关重要。由于股票价格会频繁更新,且需要快速响应市场变化,使用 ConcurrentSkipListMap
可以提供高效的插入、删除和查找操作,同时保持数据的有序性。
没有 ConcurrentSkipListMap
技术会带来什么后果:
没有使用 ConcurrentSkipListMap
或其他适合有序并发操作的数据结构可能会导致以下问题:
性能瓶颈:如果使用 HashMap
或 ConcurrentHashMap
,虽然可以实现并发更新,但无法高效执行有序操作和范围查询,可能导致查询性能不佳。
数据不一致:在高并发更新的情况下,如果没有适当的同步机制,可能会导致价格信息的不一致。
复杂性增加:如果使用 synchronized
列表或数组来维护价格索引,可能需要手动管理复杂的同步和排序逻辑,增加系统复杂性和出错的风险。
代码实现:
import java.util.concurrent.ConcurrentSkipListMap;
public class StockPriceIndex {
private final ConcurrentSkipListMap<String, Double> priceIndex = new ConcurrentSkipListMap<>();
public void updatePrice(String stockSymbol, Double newPrice) {
// 更新股票价格
priceIndex.put(stockSymbol, newPrice);
}
public Double getPrice(String stockSymbol) {
// 获取股票价格
return priceIndex.get(stockSymbol);
}
public void removeStock(String stockSymbol) {
// 移除股票信息
priceIndex.remove(stockSymbol);
}
public ConcurrentSkipListMap<String, Double> headMap(String toKey) {
// 获取指定范围内的股票价格索引
return priceIndex.headMap(toKey);
}
public static void main(String[] args) {
StockPriceIndex index = new StockPriceIndex();
index.updatePrice("AAPL", 150.00);
index.updatePrice("GOOGL", 2750.50);
index.updatePrice("MSFT", 250.00);
System.out.println("Price of AAPL: " + index.getPrice("AAPL"));
System.out.println("Price of GOOGL: " + index.getPrice("GOOGL"));
// 获取所有小于 "MSFT" 的股票价格索引
ConcurrentSkipListMap<String, Double> subMap = index.headMap("MSFT");
subMap.forEach((k, v) -> System.out.println("Stock: " + k + ", Price: " + v));
}
}
复制代码
2.11. ConcurrentLinkedQueue
ConcurrentLinkedQueue
是 Java 中一个线程安全的无锁队列,它使用 CAS (Compare-And-Swap) 操作来保证线程安全。
图解说明:
Java 线程:表示运行中的线程,它们可能需要对 ConcurrentLinkedQueue
进行入队或出队操作。
ConcurrentLinkedQueue 实例:是 ConcurrentLinkedQueue
类的实例,用于存储队列中的元素并提供线程安全的访问。
Node 节点结构:ConcurrentLinkedQueue
使用内部的 Node
类来存储队列中的每个元素。每个节点包含队列中的一个元素和指向下一个节点的链接。
虚拟头节点:队列使用一个虚拟头节点来简化出队操作。虚拟头节点不存储实际的队列元素。
虚拟尾节点:队列使用一个虚拟尾节点来简化入队操作。虚拟尾节点指向队列中的最后一个节点。
队列元素:表示存储在队列中的实际数据。
入队操作:线程将新元素添加到队列尾部的过程,通过 CAS 更新虚拟尾节点的链接。
出队操作:线程从队列头部移除元素的过程,通过 CAS 更新虚拟头节点的链接。
CAS 操作:ConcurrentLinkedQueue
使用 CAS 操作来更新节点之间的链接,从而实现无锁的线程安全队列。
自旋等待:在 CAS 操作失败时,线程可能会自旋等待直到操作成功。
操作流程:
入队操作:线程通过 CAS 操作将新节点插入到队列尾部,并更新尾节点指针。
出队操作:线程通过 CAS 操作移除队列头部的节点,并更新头节点指针。
CAS 操作:在入队和出队过程中,线程使用 CAS 来保证节点链接的原子性更新。
综合说明:
作用:ConcurrentLinkedQueue
是一种基于链接节点的无界线程安全队列,支持高并发的入队和出队操作。背景:在多线程环境中,需要一种高效的队列来处理任务或消息传递,ConcurrentLinkedQueue
提供了一种无锁的解决方案。优点:
无锁设计:利用 CAS 操作实现无锁的线程安全队列,提高了并发性能。
简单高效:提供了简单的入队和出队操作,适合作为任务队列或消息传递队列。
无界队列:理论上队列大小无界,适用于处理大量任务。缺点:
可能的内存消耗:由于是无界队列,在极端情况下可能会消耗大量内存。
性能限制:在某些高竞争场景下,CAS 操作可能导致性能瓶颈。场景:适用于作为任务队列或消息传递队列,支持高并发的入队和出队操作。业务举例:在一个分布式计算系统中,ConcurrentLinkedQueue
可以用于收集各个计算节点的输出结果,然后由一个或多个消费者线程进行处理。
这两个并发集合类在 Java 中提供了强大的工具,以支持复杂的并发数据处理需求,它们各自适用于不同的应用场景,可以根据具体需求选择合适的并发集合。kedQueue
使用方式:
import java.util.concurrent.ConcurrentLinkedQueue;
public class ConcurrentLinkedQueueDemo {
// 创建一个 ConcurrentLinkedQueue 实例
private final ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
// 向队列中添加元素
public void add(int number) {
// add 方法是线程安全的
queue.add(number);
System.out.println("Added " + number);
}
// 从队列中获取并移除元素
public Integer poll() {
// poll 方法是线程安全的,返回并移除队列头部的元素
Integer result = queue.poll();
if (result != null) {
System.out.println("Polled " + result);
} else {
System.out.println("Queue is empty");
}
return result;
}
// 查看队列头部的元素但不移除
public Integer peek() {
// peek 方法是线程安全的,返回队列头部的元素但不移除
Integer result = queue.peek();
if (result != null) {
System.out.println("Peeked " + result);
} else {
System.out.println("Queue is empty");
}
return result;
}
// 获取队列的大小
public int size() {
// size 方法估算队列的大小
int size = queue.size();
System.out.println("Queue size: " + size);
return size;
}
public static void main(String[] args) {
ConcurrentLinkedQueueDemo demo = new ConcurrentLinkedQueueDemo();
// 启动生产者线程
Thread producerThread = new Thread(() -> {
demo.add(1);
demo.add(2);
demo.add(3);
});
// 启动消费者线程
Thread consumerThread = new Thread(() -> {
demo.poll();
demo.poll();
demo.poll();
demo.poll(); // 这次调用应该会返回 null,因为队列已空
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
// 在所有线程完成后获取队列大小
demo.size();
}
}
复制代码
业务代码案例:
业务说明: 大规模日志处理系统需要从多个源实时收集、存储并分析日志数据。这些日志数据通常由分布在不同服务器上的应用程序生成,并且需要被快速地处理以避免数据丢失或延迟问题。
为什么需要 ConcurrentLinkedQueue
技术: 在日志处理场景中,日志数据的产生速度往往非常快,且来源众多,因此需要一个高效且线程安全的队列来缓存这些日志数据。ConcurrentLinkedQueue
提供了高吞吐量和低延迟的并发访问,无需使用锁,使得它特别适合用作日志数据的缓冲区。此外,由于 ConcurrentLinkedQueue
是无界的,因此不会阻塞生产者线程,即使在高负载情况下也能保持高性能。
没有 ConcurrentLinkedQueue
技术会带来什么后果: 没有使用 ConcurrentLinkedQueue
或其他高效的并发队列可能会导致以下问题:
数据丢失:如果使用有界队列且没有适当的生产者速率控制,可能会因为队列满导致日志数据丢失。
性能瓶颈:如果使用锁或其他同步机制来保护共享队列,可能会导致性能瓶颈,尤其是在高并发场景下。
系统不稳定:在高负载情况下,如果队列处理速度跟不上数据产生速度,可能会导致系统崩溃或重启。
代码实现:
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class LogProcessor {
private final ConcurrentLinkedQueue<String> logQueue = new ConcurrentLinkedQueue<>();
private final ExecutorService processorPool = Executors.newFixedThreadPool(10);
public void log(String message) {
// 生产者线程调用此方法来添加日志到队列
logQueue.add(message);
}
public void startLogProcessing() {
// 消费者线程池,用于处理队列中的日志
processorPool.submit(() -> {
while (true) {
try {
// 消费者线程调用此方法来处理队列中的日志
String logEntry = logQueue.poll();
if (logEntry != null) {
processLog(logEntry);
} else {
TimeUnit.MILLISECONDS.sleep(100); // 避免 CPU 过载
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
private void processLog(String logEntry) {
// 实际处理日志的逻辑
System.out.println("Processing log: " + logEntry);
}
public static void main(String[] args) {
LogProcessor logProcessor = new LogProcessor();
logProcessor.startLogProcessing();
// 多个生产者线程生成日志
for (int i = 0; i < 100; i++) {
int finalI = i;
new Thread(() -> {
logProcessor.log("Log entry " + finalI);
}).start();
}
}
}
复制代码
2.12. BlockingQueue
BlockingQueue
是 Java 中用于线程间通信的队列,支持阻塞操作,当队列为空时,获取元素的操作会阻塞;当队列满时,插入元素的操作会阻塞。
图解说明:
Java 线程:表示运行中的线程,它们可能需要向队列中添加或移除元素。
BlockingQueue 实例:是 BlockingQueue
接口的具体实现,如 ArrayBlockingQueue
、LinkedBlockingQueue
等,用于线程间通信。
内部数据结构:表示 BlockingQueue
内部用于存储元素的数据结构,如数组、链表等。
队列容量:表示 BlockingQueue
的最大容量,如果队列有界,则插入操作在队列满时会阻塞。
等待区(元素) :表示当队列为空时,等待获取元素的线程集合。
等待区(空间) :表示当队列满时,等待空间释放的线程集合。
元素添加操作:表示向 BlockingQueue
中添加元素的操作,如果队列满,则操作会阻塞。
元素移除操作:表示从 BlockingQueue
中移除元素的操作,如果队列为空,则操作会阻塞。
综合说明:
作用:BlockingQueue
是一个线程安全的队列,支持阻塞操作,当队列为空时,获取元素的操作会阻塞;当队列满时,插入元素的操作会阻塞。
背景:在生产者-消费者模型中,需要一种机制来协调生产者和消费者之间的操作,BlockingQueue
提供了这种协调。
优点:
线程协调:自然地实现了生产者-消费者之间的线程协调。
阻塞操作:提供了阻塞获取和阻塞插入的方法,简化了并发编程。
缺点:
可能的死锁:不当使用可能导致死锁,例如一个线程永久阻塞等待一个不会到来的元素。
性能考虑:在高并发环境下,队列的容量和锁策略需要仔细调优。
场景:适用于生产者-消费者场景,如任务分配、资源池管理等。
业务举例:在消息处理系统中,BlockingQueue
可以用于缓存待处理的消息,生产者线程生成消息并放入队列,消费者线程从队列中取出并处理消息,确保了消息的顺序性和系统的响应性。
使用方式:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class BlockingQueueDemo {
// 创建一个 LinkedBlockingQueue 实例,容量限制为10
private final BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<>(10);
// 向 BlockingQueue 中添加元素
public void produce(Integer element) throws InterruptedException {
// put 方法在队列满时阻塞,直到队列中有空间
blockingQueue.put(element);
System.out.println("Produced: " + element);
}
// 从 BlockingQueue 中获取元素
public Integer consume() throws InterruptedException {
// take 方法在队列空时阻塞,直到队列中有元素
Integer element = blockingQueue.take();
System.out.println("Consumed: " + element);
return element;
}
// 获取 BlockingQueue 的大小
public int size() {
// size 方法返回队列当前的元素数量
return blockingQueue.size();
}
public static void main(String[] args) throws InterruptedException {
BlockingQueueDemo demo = new BlockingQueueDemo();
// 创建生产者线程
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i < 15; i++) {
demo.produce(i);
Thread.sleep(100); // 生产延时
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// 创建消费者线程
Thread consumerThread = new Thread(() -> {
try {
for (int i = 0; i < 15; i++) {
int element = demo.consume();
Thread.sleep(150); // 消费延时
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
// 打印最终队列的大小
System.out.println("Final queue size: " + demo.size());
}
}
复制代码
业务代码案例:
业务说明: 消息队列系统在微服务架构中用于异步处理任务,例如发送邮件、短信通知等。这些服务通常由独立的服务实例处理,以提高系统的响应性和可扩展性。消息队列需要能够处理高并发的消息生产和消费,确保消息的可靠传递。
为什么需要 BlockingQueue
技术: BlockingQueue
提供了一种有效的机制来处理生产者-消费者场景,特别是在面对高并发和需要线程安全时。它能够使生产者在队列满时阻塞,消费者在队列空时阻塞,从而平衡生产和消费的速度,确保系统的稳定性和消息的不丢失。
没有 BlockingQueue
技术会带来什么后果:
没有使用 BlockingQueue
或其他并发队列可能会导致以下问题:
消息丢失:在高并发情况下,如果没有适当的机制来控制消息的产生和消费,可能会导致消息丢失。
系统过载:如果没有流控机制,生产者可能会过快地生成消息,导致系统资源耗尽,甚至崩溃。
数据不一致:在多线程环境下,如果不正确地管理消息的访问,可能会导致数据处理的不一致性。
代码实现:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class MessageQueueSystem {
private final BlockingQueue<Message> messageQueue = new LinkedBlockingQueue<>();
public void produceMessage(String content) throws InterruptedException {
// 将消息添加到队列中,如果队列满了,生产者线程将被阻塞
messageQueue.put(new Message(content));
System.out.println("Message produced: " + content);
}
public Message consumeMessage() throws InterruptedException {
// 从队列中取出消息,如果队列空了,消费者线程将被阻塞
Message message = messageQueue.take();
System.out.println("Message consumed: " + message.getContent());
return message;
}
public static void main(String[] args) throws InterruptedException {
MessageQueueSystem messageQueueSystem = new MessageQueueSystem();
// 创建生产者线程
Thread producerThread = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
messageQueueSystem.produceMessage("Message " + i);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 创建消费者线程
Thread consumerThread = new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
messageQueueSystem.consumeMessage();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
}
class Message {
private final String content;
public Message(String content) {
this.content = content;
}
public String getContent() {
return content;
}
}
复制代码
2.13. Condition
Condition
是 Java 中 java.util.concurrent.locks
包提供的一个接口,它用于实现等待/通知机制。Condition
通常与 Lock
接口配合使用,允许一个或多个线程在某些条件满足之前挂起,并在条件满足时被唤醒。
图解说明:
Java 线程:表示运行中的线程,它们可能需要在某些条件满足之前挂起。
Lock 实例:是 Lock
接口的具体实现,如 ReentrantLock
,用于控制对共享资源的访问。
Condition 实例:是 Condition
接口的具体实现,与 Lock
实例配合使用,用于线程间的等待/通知机制。
等待队列(线程) :当线程调用 Condition
的 await()
方法时,如果条件不满足,线程会被放入等待队列。
共享资源:表示被多个线程共享的数据,需要通过 Lock
和 Condition
来保护以确保线程安全。
条件检查:表示线程在尝试获取资源之前需要检查的条件。
唤醒信号:当条件满足时,其他线程会发送唤醒信号给等待队列中的线程。
锁状态:表示锁的当前状态,如是否被锁定,以及锁定的线程等。
操作流程:
锁定:线程通过 Lock
实例获取锁。
条件检查:线程检查条件是否满足。
等待:如果条件不满足,线程调用 Condition
的 await()
方法,释放锁并进入等待队列。
唤醒:当条件满足时,其他线程调用 Condition
的 signal()
或 signalAll()
方法,发送唤醒信号给等待队列中的线程。
重新竞争锁:被唤醒的线程重新竞争锁。
再次检查条件:线程在重新获得锁后,再次检查条件是否满足,如果满足则继续执行。
综合说明:
作用:Condition
是与 Lock
接口配合使用的同步辅助工具,它允许一个或多个线程等待,直到被其他线程唤醒。
背景:在复杂的同步场景中,需要更细粒度的控制线程的等待和唤醒,Condition
提供了这种能力。
优点:
细粒度控制:提供了比 Object.wait()
/Object.notify()
更灵活的线程间协调机制。
多条件支持:一个锁可以关联多个条件,每个条件可以独立唤醒等待的线程。
缺点:
使用复杂:需要与 Lock
一起使用,增加了编程复杂度。
错误使用可能导致死锁或线程饥饿。
场景:适用于需要线程间复杂协调的场景,如任务调度、资源分配等。
业务举例:在酒店预订系统中,Condition
可以用于实现房间状态的等待和通知机制。当房间变为空闲时,等待的顾客可以被通知并进行预订。
使用方式:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer {
private final Object[] buffer;
private int putPtr, takePtr, count;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public BoundedBuffer(int size) {
buffer = new Object[size];
}
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == buffer.length) { // 等待直到缓冲区非满
notFull.await();
}
buffer[putPtr] = x;
putPtr = (putPtr + 1) % buffer.length;
count++;
notEmpty.signal(); // 通知可能等待的消费者
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0) { // 等待直到缓冲区非空
notEmpty.await();
}
Object x = buffer[takePtr];
takePtr = (takePtr + 1) % buffer.length;
count--;
notFull.signal(); // 通知可能等待的生产者
return x;
} finally {
lock.unlock();
}
}
}
public class ProducerConsumerDemo {
private final BoundedBuffer buffer;
public ProducerConsumerDemo(int size) {
buffer = new BoundedBuffer(size);
}
public void produce(String item) {
buffer.put(item);
}
public String consume() {
return (String) buffer.take();
}
public static void main(String[] args) throws InterruptedException {
final int SIZE = 10;
final ProducerConsumerDemo demo = new ProducerConsumerDemo(SIZE);
// 生产者线程
Thread producerThread = new Thread(() -> {
for (int i = 0; i < 20; i++) {
demo.produce("Item " + i);
try {
Thread.sleep(100); // 生产延时
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
// 消费者线程
Thread consumerThread = new Thread(() -> {
for (int i = 0; i < 20; i++) {
String item = demo.consume();
System.out.println("Consumed: " + item);
try {
Thread.sleep(150); // 消费延时
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
producerThread.start();
consumerThread.start();
producerThread.join();
consumerThread.join();
}
}
复制代码
业务代码案例:
业务说明: 任务调度系统负责管理和执行定时任务。这些任务可能包括数据备份、报告生成、系统维护等。系统需要能够按预定时间触发任务,并确保任务在执行时不会相互干扰。
为什么需要 Condition
技术: 在任务调度系统中,任务的触发通常依赖于时间,而任务的执行可能需要等待特定条件满足。Condition
配合 Lock
使用,可以在没有任务可执行时让调度器线程等待,直到有任务准备好执行。这种机制允许系统在没有任务执行需求时保持空闲,从而节省资源。
没有 Condition
技术会带来什么后果:
没有使用 Condition
或其他等待/通知机制可能会导致以下问题:
资源浪费:如果调度器不断轮询检查新任务,可能会浪费大量 CPU 资源。
响应性差:在新任务到来时,如果没有有效的机制来唤醒调度器,可能会导致任务执行延迟。
代码复杂度:没有 Condition
,可能需要使用更复杂的多线程同步机制,增加了代码的复杂性和出错的风险。
代码实现:
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.Date;
import java.util.LinkedList;
import java.util.Queue;
public class TaskScheduler {
private final ReentrantLock lock = new ReentrantLock();
private final Condition taskAvailable = lock.newCondition();
private final Queue<Runnable> tasks = new LinkedList<>();
public void schedule(Runnable task, long delay) {
lock.lock();
try {
tasks.add(() -> {
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
task.run();
});
taskAvailable.signal(); // 通知调度器有新任务
} finally {
lock.unlock();
}
}
public void startScheduling() {
new Thread(this::runScheduler).start();
}
private void runScheduler() {
lock.lock();
try {
while (true) {
while (tasks.isEmpty()) { // 如果没有任务,等待
taskAvailable.await();
}
Runnable task = tasks.poll();
task.run();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
TaskScheduler scheduler = new TaskScheduler();
scheduler.schedule(() -> System.out.println("Task 1 executed at " + new Date()), 2000);
scheduler.schedule(() -> System.out.println("Task 2 executed at " + new Date()), 4000);
scheduler.startScheduling();
}
}
复制代码
评论