1. 引言
最近公司小伙伴的服务遇到一个奇怪的丢数据问题:每天总是莫名其妙的丢几条数据,经过分析排查之后发现是没有处理好并发而导致的。
问题复盘之后我认为这是并发编程中典型的原子性问题。对于并发编程不是很熟悉的小伙伴来说是一个很好的例子。
2. 问题复盘
整个业务的逻辑其实是比较简单:不断的接收消息,定时的把收集的消息发送到一个目标地址。
2.1 关键代码
talk is cheap, show me the code!
我仿写了引起并发问题的类,只保留了核心逻辑,除了lombok
与logback
之外没有引入其他第三方包。
@Slf4j
public class ConcurrentPostResult {
private List<String> cache = new ArrayList<>(1000);
/**
* 模拟接收数据
*
* @param data
*/
public void receive(String data) {
cache.add(data);
log.info("增加数据 {}, 当前数据容量 {}", data, cache.size());
}
/**
* 模拟发送数据
*
* @throws InterruptedException
*/
public void postResult() throws InterruptedException {
log.info("当前缓存数据数量 {}", cache.size());
// 等待10ms,模拟发送数据耗时,这里实际会拷贝一份数据进行发送
TimeUnit.MILLISECONDS.sleep(10);
log.info("发送数据后的缓存数据数量 {}", cache.size());
cache.clear();
log.info("清除缓存数据 {}", cache.size());
}
public static void main(String[] args) throws InterruptedException {
ConcurrentPostResult postResult = new ConcurrentPostResult();
int threadCount = 8;
ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
// 模拟并发接收数据
forkJoinPool.execute(() -> IntStream.range(0, 1000)
.mapToObj(String::valueOf)
.parallel().forEach(postResult::receive));
postResult.postResult();
}
}
复制代码
2.2 输出结果
00:29:46.671 [main] INFO org.example.ConcurrentPostResult - 当前缓存数据数量 0
00:29:46.678 [ForkJoinPool-1-worker-0] INFO org.example.ConcurrentPostResult - 增加数据 85, 当前数据容量 169
...省略日志
00:29:46.686 [ForkJoinPool-1-worker-5] INFO org.example.ConcurrentPostResult - 增加数据 547, 当前数据容量 616
00:29:46.686 [main] INFO org.example.ConcurrentPostResult - 发送数据后的缓存数据数量 616
00:29:46.686 [ForkJoinPool-1-worker-6] INFO org.example.ConcurrentPostResult - 增加数据 797, 当前数据容量 617
...省略日志
00:29:46.686 [ForkJoinPool-1-worker-0] INFO org.example.ConcurrentPostResult - 增加数据 57, 当前数据容量 12
00:29:46.686 [main] INFO org.example.ConcurrentPostResult - 清除缓存数据 2
复制代码
3. 问题分析
从上一节的日志输出可以看到,在执行postResult()
方法发送数据,实际上会经过一个比较长的网络 I/O 操作[^注 1]。并且执行该操作时,上游系统还在不断推送数据加入到缓存中,如下图所示:
我们进入到postResult()
方法时只发送缓存中的 10 条数据,但实际上在这个过程中可能不断有新数据加入到缓存中,这部分数据并没有发送给下游服务。
最后在发送完成之后执行了cache.clear()
操作导致数据丢失。
4. 解决
我们没有意识到这是个并发的操作,因此下意识的认为接收与发送数据都是原子性的:执行接收数据的时候不会发送数据,执行发送数据的时候不会接收数据。
比较直接方法是对缓存的对象加锁,这里有两个注意点:
所有涉及到共享对象(这里是 cache)的操作都需要加速
保证加的是同一把锁
@Slf4j
public class ConcurrentPostResult {
private List<String> cache = new ArrayList<>(1000);
/**
* 模拟接收数据
*
* @param data
*/
public void receive(String data) {
// 对缓存加锁
synchronized (cache) {
cache.add(data);
}
log.info("增加数据 {}, 当前数据容量 {}", data, cache.size());
}
/**
* 模拟发送数据
*
* @throws InterruptedException
*/
public void postResult() throws InterruptedException, IOException, ClassNotFoundException {
List data2Send;
// 执行发送操作前加锁
synchronized (cache) {
// 深拷贝数据,避免cache对象被阻塞太久,对性能造成影响
data2Send = deepCopy(cache);
log.info("当前缓存数据数量 {}, 待发送的数据数量 {}", cache.size(), data2Send.size());
cache.clear();
}
// 等待20ms,模拟发送数据耗时,这个时间基本上能保证把1000条数据消耗完
TimeUnit.MILLISECONDS.sleep(20);
log.info("发送数据后的缓存数据数量 {}, 发送的数据数量 {}", cache.size(), data2Send.size());
}
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
ConcurrentPostResult postResult = new ConcurrentPostResult();
int threadCount = 8;
ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);
// 模拟并发接收数据
forkJoinPool.execute(() -> IntStream.range(0, 1000)
.mapToObj(String::valueOf)
.parallel().forEach(postResult::receive));
log.info("执行前");
TimeUnit.MILLISECONDS.sleep(5);
log.info("执行后");
postResult.postResult();
}
public static <T> List<T> deepCopy(List<T> src) throws IOException, ClassNotFoundException {
ByteArrayOutputStream byteOutput = new ByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(byteOutput);
out.writeObject(src);
ByteArrayInputStream byteInput = new ByteArrayInputStream(byteOutput.toByteArray());
ObjectInputStream input = new ObjectInputStream(byteInput);
List<T> dest = (List<T>) input.readObject();
return dest;
}
}
复制代码
这里我特意增加了等待时间,可以看到已发送的数据与未发送数据之和为 1000,确保数据未丢失。
21:13:17.268 [main] INFO org.example.ConcurrentPostResult - 执行前
21:13:17.274 [ForkJoinPool-1-worker-4] INFO org.example.ConcurrentPostResult - 增加数据 159, 当前数据容量 26
...省略日志
21:13:17.276 [ForkJoinPool-1-worker-3] INFO org.example.ConcurrentPostResult - 增加数据 932, 当前数据容量 157
21:13:17.277 [main] INFO org.example.ConcurrentPostResult - 执行后
21:13:17.277 [ForkJoinPool-1-worker-3] INFO org.example.ConcurrentPostResult - 增加数据 933, 当前数据容量 178
21:13:17.277 [ForkJoinPool-1-worker-7] INFO org.example.ConcurrentPostResult - 增加数据 204, 当前数据容量 176
21:13:17.288 [main] INFO org.example.ConcurrentPostResult - 当前缓存数据数量 178, 待发送的数据数量 178
21:13:17.288 [ForkJoinPool-1-worker-7] INFO org.example.ConcurrentPostResult - 增加数据 205, 当前数据容量 1
...省略日志
21:13:17.301 [ForkJoinPool-1-worker-4] INFO org.example.ConcurrentPostResult - 增加数据 874, 当前数据容量 822
21:13:17.311 [main] INFO org.example.ConcurrentPostResult - 发送数据后的缓存数据数量 822, 发送的数据数量 178
复制代码
5. 结论
本文从一个真实案例说起,分析了代码中隐藏的并发问题以及解决方案。
在这个例子中,以下几点内容是我们需要关注的:
分析你的服务是否存在并发场景
是否有对共享对象的操作(上文的例子中是cache
对象)
加锁[^注 2]可以解决并发问题中的原子性、一致性、顺序性问题
在这里我们只讨论了最直观的解决方案,有机会在后续的文章中将深入讨论 Java 内存模型来对并发问题追根溯源。
[^注 1]: 这里是让线程暂停了 10ms 模拟网络 I/O 耗时
[^注 2]:例子中用了 java 的 synchronized 关键字
评论