欢迎关注,后台提供高并发历史文章整理PDF下载
背景
我们的支付场景下,要求消费的业务消息绝不能丢失,且能充分利用高规格的服务器的性能,比如用线程池对业务消息进行快速处理。有同学可能没太理解这个问题有啥不好处理,让我一步步分析下。
MQ 的优势和缺点
MQ 是我们在应对高并发场景最常用的一种措施,它可以帮我们对业务解耦、对流程异步化以及削峰填谷的妙用。
但是,由于引入了这一额外的中间件,也增加了系统的复杂度和不稳定因素。
消息可靠性的应对
消息的可靠性保证需要从消息流转的每个环节进行保障,比如生产端的事务型消息,broker 的实时刷盘持久化,消费端的手动 ACK 。
这里,我们对生产端和存储端的保障措施不作讨论,重点关注消费端的手动 ACK 机制。
手动 ACK 的问题
手动 ACK 可以保证消息一定被消费,但是需要确保手动 ACK 的顺序和消息顺序一致,为什么?
消息队列之所以性能高处理快,是因为采用了文件顺序读写方式,系统在拉取消息进行消费时,是按顺序文件的 offset 进行拉取的,如果 commit offset 的顺序错乱,会使得服务端的消息状态错乱,比如消息重发。
因此,如果我们在本地启动了线程池,对消息进行拉取处理,由于各线程的处理速度不一定一致,所以无法保证各线程处理完之后对各自消息的 ACK 操作是顺序的,怎么办,难道只能同步拉消费取然后 ACK 么。
解决方案
最不济,可以提交一批任务,批量等待统一提交。不过总觉得不优雅。
某次看 JUC 中的 AQS 的时候,启发了我。
我们平时用的类似 CountDownLauch 这些并发工具类,不也是处理的多线程协作的问题么。
我们的场景完全没有 AQS 复杂,借鉴它的思路,应该是没有问题的。
创建双端队列,队列节点中需要维护自身处理状态 state,和对应 msg 的 offset。
服务从消息中心拉取消息,在提交本地线程池执行之前,先入队列。
消息消费完之后,通知队列中对应的节点,更新状态为完成。
队列头被更新后出队列,提交 offset,并判断新的队列头的状态,直到遇到 state 是未完成的 head 时阻塞。
undefined
方案解析
该方案可以有效利用本地线程的资源,并行的处理,并通过队列和异步通知机制保证最终 commit offset 时有序。
在最差情况下(即 head 节点对应的 msg 最后一个被处理完),相当于等待一批线程处理完成后统一提交。除此之外等待性能都要更优。
异步通知的实现
public class MSGFuture {
/*全局变量,存放msg对应的future对象*/
private static final Map<Long, MSGFuture> FUTURES = new ConcurrentHashMap<Long, MSGFuture>();
/*全局不变唯一标识*/
private final long id;
/*最长等待时间*/
private final int timeout;
/*并发锁*/
private final Lock lock = new ReentrantLock();
/*通知条件*/
private final Condition done = lock.newCondition();
/*开始时间*/
private final long start = System.currentTimeMillis();
/*业务结果*/
private volatile Object response;
}
复制代码
复制代码
//构造函数
public MSGFuture(Request request, int timeout) {
/*全局自增ID*/
this.id = request.getrId();
/*超时时间*/
this.timeout = timeout > 0 ? timeout : 1000;
/*放入全局变量*/
FUTURES.put(id, this);
}
复制代码
复制代码
//业务处理结果更新
public static void received(long id, Object response) {
MSGFuture future = FUTURES.remove(id);
if (future != null) {
future.doReceived(response);
} else {
logger.warn("response return timeout,id:"+id);
}
}
复制代码
复制代码
//结果更新,通知等待条件
private void doReceived(Object res) {
lock.lock();
try {
response = res;
done.signal();
} finally {
lock.unlock();
}
}
复制代码
复制代码
//异步等待获取结果
public Object get(int timeout) throws TimeoutException {
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException();
}
}
return returnFromResponse();
}
复制代码
复制代码
总结
看到这里,有同学会说,这个和 AQS 有啥关系呀~
其实,只是处理思路的一种借鉴,比如 state 状态,比如锁机制和通知等待。既然都是多线程任务协调,那总有相似之处。
总之一句话,别说背八股文没用,多多了解会有大帮助~
任何技术相关问题,欢迎留言,讨论~
评论