写点什么

借鉴 AQS 的 CHL 思路解决消息多线程消费顺序 ACK 问题

发布于: 2021 年 05 月 09 日
欢迎关注,后台提供高并发历史文章整理PDF下载

背景

我们的支付场景下,要求消费的业务消息绝不能丢失,且能充分利用高规格的服务器的性能,比如用线程池对业务消息进行快速处理。有同学可能没太理解这个问题有啥不好处理,让我一步步分析下。

MQ 的优势和缺点

MQ 是我们在应对高并发场景最常用的一种措施,它可以帮我们对业务解耦、对流程异步化以及削峰填谷的妙用。

但是,由于引入了这一额外的中间件,也增加了系统的复杂度和不稳定因素。

消息可靠性的应对

消息的可靠性保证需要从消息流转的每个环节进行保障,比如生产端的事务型消息,broker 的实时刷盘持久化,消费端的手动 ACK 。

这里,我们对生产端和存储端的保障措施不作讨论,重点关注消费端的手动 ACK 机制。

手动 ACK 的问题

手动 ACK 可以保证消息一定被消费,但是需要确保手动 ACK 的顺序和消息顺序一致,为什么?

消息队列之所以性能高处理快,是因为采用了文件顺序读写方式,系统在拉取消息进行消费时,是按顺序文件的 offset 进行拉取的,如果 commit offset 的顺序错乱,会使得服务端的消息状态错乱,比如消息重发。

因此,如果我们在本地启动了线程池,对消息进行拉取处理,由于各线程的处理速度不一定一致,所以无法保证各线程处理完之后对各自消息的 ACK 操作是顺序的,怎么办,难道只能同步拉消费取然后 ACK 么。

解决方案

最不济,可以提交一批任务,批量等待统一提交。不过总觉得不优雅。

某次看 JUC 中的 AQS 的时候,启发了我。

我们平时用的类似 CountDownLauch 这些并发工具类,不也是处理的多线程协作的问题么。

我们的场景完全没有 AQS 复杂,借鉴它的思路,应该是没有问题的。



  1. 创建双端队列,队列节点中需要维护自身处理状态 state,和对应 msg 的 offset。

  2. 服务从消息中心拉取消息,在提交本地线程池执行之前,先入队列。

  3. 消息消费完之后,通知队列中对应的节点,更新状态为完成。

  4. 队列头被更新后出队列,提交 offset,并判断新的队列头的状态,直到遇到 state 是未完成的 head 时阻塞。

    undefined

方案解析

该方案可以有效利用本地线程的资源,并行的处理,并通过队列和异步通知机制保证最终 commit offset 时有序。

在最差情况下(即 head 节点对应的 msg 最后一个被处理完),相当于等待一批线程处理完成后统一提交。除此之外等待性能都要更优。

异步通知的实现

public class MSGFuture {    /*全局变量,存放msg对应的future对象*/    private static final Map<Long, MSGFuture> FUTURES = new ConcurrentHashMap<Long, MSGFuture>();    /*全局不变唯一标识*/    private final long id;    /*最长等待时间*/    private final int timeout;    /*并发锁*/    private final Lock lock = new ReentrantLock();    /*通知条件*/    private final Condition done = lock.newCondition();    /*开始时间*/    private final long start = System.currentTimeMillis();    /*业务结果*/    private volatile Object response;}
复制代码
复制代码


//构造函数public MSGFuture(Request request, int timeout) {    /*全局自增ID*/    this.id = request.getrId();    /*超时时间*/    this.timeout = timeout > 0 ? timeout : 1000;    /*放入全局变量*/    FUTURES.put(id, this);}复制代码
复制代码


//业务处理结果更新public static void received(long id, Object response) {
        MSGFuture future = FUTURES.remove(id);        if (future != null) {            future.doReceived(response);        } else {            logger.warn("response return timeout,id:"+id);        }
    }
复制代码
复制代码


//结果更新,通知等待条件private void doReceived(Object res) {        lock.lock();        try {            response = res;            done.signal();        } finally {            lock.unlock();        }    }
复制代码
复制代码


//异步等待获取结果public Object get(int timeout) throws TimeoutException {        if (!isDone()) {            long start = System.currentTimeMillis();            lock.lock();            try {                while (!isDone()) {                    done.await(timeout, TimeUnit.MILLISECONDS);                    if (isDone() || System.currentTimeMillis() - start > timeout) {                        break;                    }                }            } catch (InterruptedException e) {                throw new RuntimeException(e);            } finally {                lock.unlock();            }            if (!isDone()) {                throw new TimeoutException();            }        }        return returnFromResponse();    }
复制代码
复制代码

总结

看到这里,有同学会说,这个和 AQS 有啥关系呀~

其实,只是处理思路的一种借鉴,比如 state 状态,比如锁机制和通知等待。既然都是多线程任务协调,那总有相似之处。

总之一句话,别说背八股文没用,多多了解会有大帮助~


任何技术相关问题,欢迎留言,讨论~


发布于: 2021 年 05 月 09 日阅读数: 10
用户头像

还未添加个人签名 2018.03.14 加入

一个工作多年的技术人,浪过京东、支付宝, 干过电商、搞过支付链路、玩过广告系统~ ,欢迎关注同名微信公众号,有任何想法问题,欢迎大伙交流讨论

评论

发布
暂无评论
借鉴AQS的CHL思路解决消息多线程消费顺序ACK问题