RocketMQ - 如何实现顺序消息
顺序消息的使用场景
日常项目中需要保证顺序的应用场景非常多,比如交易场景中的订单创建、支付、退款等流程,先创建订单才能支付,支付完成的订单才能退款,这需要保证先进先出。又例如数据库的BinLog消息,数据库执行新增语句、修改语句,BinLog消息得到顺序也必须保证是新增消息、修改消息。
如何发送和消费顺序消息
我们使用RocketMQ顺序消息来模拟一下订单的场景,顺序消息分为两部分:顺序发送、顺序消费。
顺序发消息
上面代码模拟了按顺序依次发送创建、支付、退款消息到TopicTest中。在application.properties配置文件中指定producer.sync=true,默认是异步发送,此处改为同步发送。
MessageBuilder设置Header信息头,表示这是一条顺序消息,将消息固定地发送到第0个消息队列。
顺序收消息
程序运行后,可以在控制台看到日志输出,也是按照顺序打印出来的
顺序发送的技术原理
RocketMQ的顺序消息分为2种情况:局部有序和全局有序。前面的例子是局部有序场景。
局部有序:指发送同一个队列的消息有序,可以在发送消息时指定队列,在消费消息时也按顺序消费。例如同一个订单ID的消息要保证有序,不同订单的消息没有约束,相互不影响,不同订单ID之间的消息时并行的。
全局有序:设置Topic只有一个队列可以实现全局有序,创建Topic时手动设置。此类场景极少,性能差,通常不推荐使用。
RocketMQ中消息发送有三种方式:同步、异步、单项。
同步:发送网络请求后会同步等待Broker服务器返回结果,支持发送失败重试,适用于比较重要的消息通知场景。
异步:异步发送网络请求,不会阻塞当前线程,不支持失败重试,适用于对响应时间要求更高的场景。
单向:单向发送原理和异步一致,但不支持回调。适用于响应时间非常端,对可靠性要求不高的场景,例如日志收集。
顺序消息发送的原理比较简单,同一类消息发送到相同的队列即可。为了保证先发送的消息先存储到消息队列,必须使用同步发送的方式,否则可能出现先发送的消息后到消息队列中,此时消息就乱序了。
RocketMQ的核心代码如下:
选择队列的过程由messageQueueSelector和hashKey在实现类SelectMessageQueueByHash中完成
根据hashKey计算hash值,hashKey是我们前面例子中订单ID,因此相同订单ID的hash值相同。
用hash值和队列数mqs.size()取模,得到一个索引值,结果小于队列数。
根据索引值从队列列表中取出一个队列mqs.get(value),hash值相同则队列相同。
在队列列表的获取过程中,由Producer从NameServer根据Topic查询Broker列表,缓存在本地内存中,以便下次从缓存中读取。
普通发送的技术原理
RocketMQ中除了顺序消息外,还支持事务消息和延迟消息,非这三种特殊的消息称为普通消息。日常开发中最常用的是普通消息,这是因为最常用的场景就是系统间的异步解耦和流量的削峰填谷,这些场景下尽量保证消息高性能收发即可。
从普通消息与顺序消息的对比来看,普通消息在发送时选择消息队列的策略不同。普通消息发送选择队列有两种机制:轮询机制和故障规避机制。默认使用轮询机制,一个Topic有多个队列,轮询选择其中一个队列。
轮询机制的原理是路由信息TopicPublishInfo中维护了一个计数器sendWhichQueue,每发送一次消息需要查询一次路由,计算器就进行“+1”,通过计数器的值index与队列的数量取模计算来实现轮询算法。
轮询算法简单好用,但是有个弊端,如果轮询选择的队列是在宕机的Broker上,会导致消息发送失败,即使消息发送重试的时候重新选择队列,也可能还是在宕机的Broker上,无法规避发送失败的情况,因此就有了故障规避机制。
顺序消费的技术原理
RocketMQ支持两种消费模式:集群消费和广播消费。两者的区别是,在广播消费模式下每条消息会被ConsumerGroup的每个Consumer消费,在集群消费模式下每条消息只会被ConsumerGroup的一个Consumer消费。
多数场景都使用集群消费,消息每次消费代表一次业务处理,集群消费表示每条消息由业务应用集群中任意一个服务实例来处理。少数场景使用广播消费,例如数据发生变化,更新业务应用集群中每个服务的本地缓存,这就需要一条消息被整个集群都消费一次,默认是集群消费。
顺序消费也叫做有序消费,原理是同一个消息队列只允许Consumer中的一个消费线程拉取消费,Consumer中有个消费线程池,多个线程会同时消费消息。在顺序消费的场景下消费线程请求到Broker时会先申请独占锁,获得锁的请求则允许消费。
消息消费成功后,会向Broker提交消费进度,更新消费位点信息,避免下次拉取到已消费的消息,顺序消费中如果消费线程在监听器中进行业务处理时抛出异常,则不会提交消费进度,消费进度会阻塞在当前这条消息,并不会继续消费该队列中的后续消息,从而保证顺序消费。
在顺序消费的场景下,特别需要注意对异常的处理,如果重试也失败,会一直阻塞在当前消息,直到超出最大重试次数,从而在很长一段时间内无法消费后续消息造成队列消息堆积。
并发消费的原理
RocketMQ支持两种消费方式:顺序消费和并发消费。并发消费是默认的消费方式,日常开发过程中最常用的方式,除了顺序消费就是并发消费。
并发消费也称为乱序消费,其原理是同一个消息队列提供给Consumer中的多个消费线程拉取消费。Consumer中会维护一个消费线程池,多个消费线程可以并发去同一个消息队列中拉取消息进行消费。如果某个消费线程在监听器中进行业务处理时抛出异常,当前线程会进行重试,不影响其它消费线程和消费队列的消费进度,消费成功的线程正常提交消费进度。
并发消费相比于顺序消费没有资源争抢上锁的过程,消费消息的速度比顺序消费要快很多。
消息的幂等性
说到消息消费不得不提到消息的幂等性,业务代码中通常收到一条消息进行一次业务逻辑处理,如果一条相同的消息被重复收到几次,是否会导致业务重复处理?Consumer能够不重复接收消息?
RocketMQ不保证消息不被重复消费,如果业务对消息重复消费非常敏感,必须要在业务层面进行幂等性处理,具体实现可以通过分布式锁来完成。
在所有消息系统中消费消息有三种模式:at-most-once(最多一次)、at-least-once(最少一次)和exactly-only-once(精确仅一次),分布式消息系统都是在三者间取平衡,前两者是可行的并且被广泛使用。
at-most-once:消息投递后不论消息是否被消费成功,不会再重复投递,有可能会导致消息未被消费,RocketMQ未使用该方式。
at-lease-once:消息投递后,消费完成后,向服务器返回ACK,没有消费则一定不会返回ACK消息。由于网络异常、客户端重启等原因,服务器未能收到客户端返回的ACK,服务器则会再次投递,这就会导致可能重复消费,RocketMQ通过ACK来确保消息至少被消费一次。
exactly-only-once:必须下面两个条件都满足,才能认为消息是"Exactly Only Once"。 发送消息阶段,不允许发送重复消息;消费消息阶段,不允许消费重复的消息。在分布式系统环境下,如果要实现该模式,巨大的开销不可避免。RocketMQ没有保证此特性,无法避免消息重复,由业务上进行幂等性处理。
版权声明: 本文为 InfoQ 作者【Java收录阁】的原创文章。
原文链接:【http://xie.infoq.cn/article/fba37afd9bda31fb10eec651f】。文章转载请联系作者。
评论 (1 条评论)