使用 Java 客户端发送消息和消费的应用
体验链接:https://developer.aliyun.com/adc/scenario/fb1b72ee956a4068a95228066c3a40d6
实验简介
本教程将 Demo 演示使用 java 客户端发送消息和消费的应用场景
实验实操
第 1 节 如何发送和消费并发消息
并发消息,也叫普通消息,是相对顺序消息而言的,普通消息的效率最高。本教程将简单演示如何使用纯 java client 发送和消费消息。
1. 下载 java 代码 demo(已下载则忽略操作)
2. 打包,执行代码 demo
再执行命令, 可以看到正常生产和消费输出
3. Demo 代码说明
Demo 代码可以查看 github。并发消息,意思是生产者可以并发的向 topic 中发送消息, 消费端不区分顺序的消息,这种模式效率最好。生产者 demo 代码如下:
最后留一个思考题给大家: 生产者实例和消费者实例, 都是线程安全的吗?
第 2 节 如何发送和消费顺序消息
顺序消息分为分区有序和全局有序。生产消费代码都是一样的, 区别在于分区有序的 topic 中 queue 个数可以是任意有效值,全局有序的 topic 要求 queue 的个数为 1。顺序消息的实现非常简单易懂,但牺牲了可用性,单节点故障会直接影响顺序消息。
什么是分区有序消息,什么场景应该使用呢,又该如何发送分区有序消息?分区有序表示在一个 queue 中的消息是有序的,发送消息时设置设置了相同 key 的消息会被发送到同一个 queue 中。
本教程将简单演示如何使用纯 java client 发送和消费顺序消息。
1. 下载 java 代码 demo(已下载则忽略操作)
2. 打包,执行代码 demo
再执行命令, 可以看到正常生产和消费输出。 消费输出注意看相同 queue id 的消息输出内容中的数字,按照从小到大就是正确的。
3. Demo 代码说明
Demo 代码可以查看 github。
生产者说明
生产者会根据设置的 keys 做 hash,相同 hash 值的消息会发送到相同的 queue 中。所以相同 hash 值的消息需要保证在同一个线程中顺序的发送。
消费者说明
消费者使用相对比较简单, 消息监听类实现 org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly 接口即可。相同 queue 的消息需要串行处理,这样救保证消费的顺序性
第 3 节 如何发送和消费延迟消息
延迟消息,对于一些特殊场景比如订票后 30 分钟不支付自动取消等类似场景比较有用。本教程将简单演示如何使用纯 java client 发送和消费延迟消息。
1. 下载 java 代码 demo(已下载则忽略操作)
2. 打包,执行代码 demo
执行命令, 可以看到正常生产和消费输出。 目前 RocketMQ 支持多种延迟级别 , 不过每种延迟级别都是基于 RocketMQ 自身,实际延迟时间会加上 Broker-Client 端的网络情况不同而略有差异。
3. Demo 代码说明
Demo 代码可以查看 github。
生产者说明
生产者在发送消息的时候需要设置延迟级别,RocketMQ 支持多种延迟级别。如果把延迟时间算作一个以空格分割的数组,延迟级别就是延迟时间数组的下标 index+1。RocketMQ 如何解析延迟级别和延迟时间映射关系。
消费者说明:消费者按照并发消息消费即可
第 4 节 如何发送和消费事务消息
事务消息,是 RocketMQ 解决分布式事务的一种实现,极其简单好用。一个事物消息大致的生命周期如下图
概括为如下几个重要点:
生产者发送 half 消息(事物消息)
Broker 存储 half 消息
生产者处理本地事物,处理成功后 commit 事物
消费者消费到事物消息
本教程将简单演示如何使用纯 java client 发送和消费事物消息。
1. 下载 java 代码 demo(已下载则忽略操作)
2. 打包,执行代码 demo
执行命令, 可以看到事物消息的全部过程。
3. Demo 代码说明
Demo 代码可以查看 github。在事物消息中,消费代码和普通消息的消费一样,主要代码在生产者端。
生产者端的主要代码包含 3 个步骤:
初始化生产者,设置回调线程池、设置本地事物处理监听类。
这里注意事物消息的生产者类是: org.apache.rocketmq.client.producer.TransactionMQProducer, 而不是普通生产者类。
事物监听类需要实现 2 个方法,这里的逻辑都是 mock 的,实际使用的时候需要根据实际修改。
发送事物消息。调用 sendMessageInTransaction () 方法发送事物消息, 而不是以前的 send () 方法。
第 5 节 生产者消费者如何同步发送、消费消息(Request-Reply)
request-reply 模式,可以满足目前类似 RPC 同步调用的场景,本教程将简单演示如何使用该模式。
1. 下载 java 代码 demo(已下载则忽略操作)
2. 打包,执行代码 demo
执行命令, 可以看到正常生产和消费输出。
通过代码结果和代码比较, 我们得知 request-reply 类似 RPC 同步调用的效果。
个人觉得:需要同步调用就用 RPC, 不要走 MQ,毕竟两者是完全不同的目标的产品,专业的事情交给专业的产品。
3. Demo 代码说明
Demo 代码可以查看 github。
request-reply 模式,在生产者和消费者两端都和一般的生产消费有区别,下面分别介绍下 demo 代码。
生产者 demo 主要代码,主要区别在于调用 request (),而不是 send () 方法。
消费者 demo 主要代码:消费代码主要增加了 “回复” 逻辑。回复是利用消息发送直接向生产者发送一条消息。 有点类似事物消息中 broker 回查生产者。
一个小问题:事物消息和 request-reply 消息时,生产者的生产者组名有什么要求嘛?
第 6 节 如何有选择性的消费消息
有时候我们只想消费部分消息, 当然全部消费,在代码中过滤。 假如消息海量时, 会有很多资源浪费,比如浪费不必要的带宽。我们可以通过 tag,sql92 表达式来选择性的消费。
进入 broker 目录
编辑配置文件,修改 broker 配置项 2 个
配置项值:
修改后:
重启 broker
1. 下载 java 代码 demo(已下载则忽略操作)
2. 打包,执行 tag 过滤代码 demo
执行命令, 可以看到正常生产和消费输出。
3. 执行 sql 过滤代码 demo
执行命令, 可以看到正常生产和消费输出。
4. Demo 代码说明
Demo 代码可以查看 github。以下分别介绍生产者和消费者主要 demo 代码。
生产者
在生产 tag 消息的时候, 消息中需要加上发送 tag;sql92 过滤的时候,加上自定义 k-v。
消费者
tag 过滤消费时,在订阅 topic 时, 也添加上 tag 订阅
SQL92 过滤时,添加上 SQL 过滤订阅。至于 SQL92 除了等号,还是支持什么,大家可以自行自行查看或者到群里问。
第 7 节 如何使用 ACL 客户端生产消费消息
ACL,全称是 Access Control List,是 RocketMQ 设计来做访问和权限控制的。更多文档参见 github wiki:https://github.com/apache/rocketmq/wiki/RIP-5-RocketMQ-ACL
0. 启动一个集群
进入 broker 目录
编辑配置文件,修改 broker 配置项 1 个
配置项值:
修改后:
重启 broker
1. 下载 java 代码 demo(已下载则忽略操作)
2. 打包,执行代码 demo
执行命令, 可以看到正常生产和消费输出。 demo 代码使用的 admin 权限发送和消费,实际使用需要对于每个 topic,消费者组授权,才能正常生产消费。
3. Demo 代码说明
Demo 代码可以查看 github。带 ACL 的生产者和消费者在初始化的时候,都必须给一个 hook 实例,构建方法如下:
在 broker 端 secret key 用来校验信息的完整性, access key 用来校验用户权限。二者缺一不可。
评论