写点什么

使用 Java 客户端发送消息和消费的应用

  • 2022 年 7 月 15 日
  • 本文字数:4226 字

    阅读完需:约 14 分钟


体验链接:https://developer.aliyun.com/adc/scenario/fb1b72ee956a4068a95228066c3a40d6

实验简介

本教程将 Demo 演示使用 java 客户端发送消息和消费的应用场景

实验实操

第 1 节 如何发送和消费并发消息

并发消息,也叫普通消息,是相对顺序消息而言的,普通消息的效率最高。本教程将简单演示如何使用纯 java client 发送和消费消息。

1. 下载 java 代码 demo(已下载则忽略操作)

cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
复制代码

2. 打包,执行代码 demo

再执行命令, 可以看到正常生产和消费输出

// 进入demo代码目录cd /data/demos/06-all-java-demos/
// 打包mvn clean package
// 运行代码mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ConcurrentMessageDemo" -Dexec.classpathScope=runtime

复制代码

3. Demo 代码说明

Demo 代码可以查看 github。并发消息,意思是生产者可以并发的向 topic 中发送消息, 消费端不区分顺序的消息,这种模式效率最好。生产者 demo 代码如下:



最后留一个思考题给大家: 生产者实例和消费者实例, 都是线程安全的吗?

第 2 节 如何发送和消费顺序消息

顺序消息分为分区有序和全局有序。生产消费代码都是一样的, 区别在于分区有序的 topic 中 queue 个数可以是任意有效值,全局有序的 topic 要求 queue 的个数为 1。顺序消息的实现非常简单易懂,但牺牲了可用性,单节点故障会直接影响顺序消息。

什么是分区有序消息,什么场景应该使用呢,又该如何发送分区有序消息?分区有序表示在一个 queue 中的消息是有序的,发送消息时设置设置了相同 key 的消息会被发送到同一个 queue 中。

本教程将简单演示如何使用纯 java client 发送和消费顺序消息。

1. 下载 java 代码 demo(已下载则忽略操作)

cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
复制代码

2. 打包,执行代码 demo

再执行命令, 可以看到正常生产和消费输出。 消费输出注意看相同 queue id 的消息输出内容中的数字,按照从小到大就是正确的。

// 进入demo代码目录cd /data/demos/06-all-java-demos/
// 打包mvn clean package
// 运行代码mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.OrderMessageDemo1" -Dexec.classpathScope=runtime

复制代码



3. Demo 代码说明

Demo 代码可以查看 github

  • 生产者说明

生产者会根据设置的 keys 做 hash,相同 hash 值的消息会发送到相同的 queue 中。所以相同 hash 值的消息需要保证在同一个线程中顺序的发送。



  • 消费者说明

消费者使用相对比较简单, 消息监听类实现 org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly 接口即可。相同 queue 的消息需要串行处理,这样救保证消费的顺序性



第 3 节 如何发送和消费延迟消息

延迟消息,对于一些特殊场景比如订票后 30 分钟不支付自动取消等类似场景比较有用。本教程将简单演示如何使用纯 java client 发送和消费延迟消息。

1. 下载 java 代码 demo(已下载则忽略操作)

cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
复制代码

2. 打包,执行代码 demo

执行命令, 可以看到正常生产和消费输出。 目前 RocketMQ 支持多种延迟级别 , 不过每种延迟级别都是基于 RocketMQ 自身,实际延迟时间会加上 Broker-Client 端的网络情况不同而略有差异。

// 进入demo代码目录cd /data/demos/06-all-java-demos/
// 打包mvn clean package
// 运行代码mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.DelayMessageDemo" -Dexec.classpathScope=runtime

复制代码



3. Demo 代码说明

Demo 代码可以查看 github

  • 生产者说明

生产者在发送消息的时候需要设置延迟级别,RocketMQ 支持多种延迟级别。如果把延迟时间算作一个以空格分割的数组,延迟级别就是延迟时间数组的下标 index+1。RocketMQ 如何解析延迟级别和延迟时间映射关系。



  • 消费者说明:消费者按照并发消息消费即可

第 4 节 如何发送和消费事务消息

事务消息,是 RocketMQ 解决分布式事务的一种实现,极其简单好用。一个事物消息大致的生命周期如下图



概括为如下几个重要点:

  1. 生产者发送 half 消息(事物消息)

  2. Broker 存储 half 消息

  3. 生产者处理本地事物,处理成功后 commit 事物

  4. 消费者消费到事物消息

本教程将简单演示如何使用纯 java client 发送和消费事物消息。

1. 下载 java 代码 demo(已下载则忽略操作)

cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
复制代码

2. 打包,执行代码 demo

执行命令, 可以看到事物消息的全部过程。

// 进入demo代码目录cd /data/demos/06-all-java-demos/
// 打包mvn clean package
// 运行代码mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.TransactionMessageDemo" -Dexec.classpathScope=runtime

复制代码

3. Demo 代码说明

Demo 代码可以查看 github。在事物消息中,消费代码和普通消息的消费一样,主要代码在生产者端。

生产者端的主要代码包含 3 个步骤:

  1. 初始化生产者,设置回调线程池、设置本地事物处理监听类。

这里注意事物消息的生产者类是: org.apache.rocketmq.client.producer.TransactionMQProducer, 而不是普通生产者类。



事物监听类需要实现 2 个方法,这里的逻辑都是 mock 的,实际使用的时候需要根据实际修改。



  1. 发送事物消息。调用 sendMessageInTransaction () 方法发送事物消息, 而不是以前的 send () 方法。



第 5 节 生产者消费者如何同步发送、消费消息(Request-Reply)

request-reply 模式,可以满足目前类似 RPC 同步调用的场景,本教程将简单演示如何使用该模式。

1. 下载 java 代码 demo(已下载则忽略操作)

cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
复制代码

2. 打包,执行代码 demo

执行命令, 可以看到正常生产和消费输出。

// 进入demo代码目录cd /data/demos/06-all-java-demos/
// 打包mvn clean package
// 运行代码mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.RequestReplyMessageDemo" -Dexec.classpathScope=runtime

复制代码

通过代码结果和代码比较, 我们得知 request-reply 类似 RPC 同步调用的效果。



个人觉得:需要同步调用就用 RPC, 不要走 MQ,毕竟两者是完全不同的目标的产品,专业的事情交给专业的产品。

3. Demo 代码说明

Demo 代码可以查看 github

request-reply 模式,在生产者和消费者两端都和一般的生产消费有区别,下面分别介绍下 demo 代码。

生产者 demo 主要代码,主要区别在于调用 request (),而不是 send () 方法。



消费者 demo 主要代码:消费代码主要增加了 “回复” 逻辑。回复是利用消息发送直接向生产者发送一条消息。 有点类似事物消息中 broker 回查生产者。



一个小问题:事物消息和 request-reply 消息时,生产者的生产者组名有什么要求嘛?

第 6 节 如何有选择性的消费消息

有时候我们只想消费部分消息, 当然全部消费,在代码中过滤。 假如消息海量时, 会有很多资源浪费,比如浪费不必要的带宽。我们可以通过 tag,sql92 表达式来选择性的消费。

  • 进入 broker 目录

cd /usr/local/services/5-rocketmq/broker-01
复制代码
  • 编辑配置文件,修改 broker 配置项 2 个

vim conf/broker.conf
复制代码

配置项值:

// 是否支持重试消息也过滤filterSupportRetry=true
// 支持属性过滤enablePropertyFilter=true
复制代码

修改后:



  • 重启 broker

./restart.sh
复制代码

1. 下载 java 代码 demo(已下载则忽略操作)

cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
复制代码

2. 打包,执行 tag 过滤代码 demo

执行命令, 可以看到正常生产和消费输出。

// 进入demo代码目录cd /data/demos/06-all-java-demos/
// 打包mvn clean package
// 运行代码mvn exec:java -Dexec.args="127.0.0.1:39876 tag" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

复制代码

3. 执行 sql 过滤代码 demo

执行命令, 可以看到正常生产和消费输出。

// 进入demo代码目录cd /data/demos/06-all-java-demos/
// 打包mvn clean package
// 运行代码mvn exec:java -Dexec.args="127.0.0.1:39876 sql" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime

复制代码

4. Demo 代码说明

Demo 代码可以查看 github。以下分别介绍生产者和消费者主要 demo 代码。

  • 生产者

在生产 tag 消息的时候, 消息中需要加上发送 tag;sql92 过滤的时候,加上自定义 k-v。



  • 消费者

tag 过滤消费时,在订阅 topic 时, 也添加上 tag 订阅



SQL92 过滤时,添加上 SQL 过滤订阅。至于 SQL92 除了等号,还是支持什么,大家可以自行自行查看或者到群里问。



第 7 节 如何使用 ACL 客户端生产消费消息

ACL,全称是 Access Control List,是 RocketMQ 设计来做访问和权限控制的。更多文档参见 github wiki:https://github.com/apache/rocketmq/wiki/RIP-5-RocketMQ-ACL

0. 启动一个集群

  • 进入 broker 目录

cd /usr/local/services/5-rocketmq/broker-01
复制代码
  • 编辑配置文件,修改 broker 配置项 1 个

vim conf/broker.conf
复制代码

配置项值:

aclEnable=true
复制代码

修改后:



  • 重启 broker

./restart.sh
复制代码

1. 下载 java 代码 demo(已下载则忽略操作)

cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
复制代码

2. 打包,执行代码 demo

执行命令, 可以看到正常生产和消费输出。 demo 代码使用的 admin 权限发送和消费,实际使用需要对于每个 topic,消费者组授权,才能正常生产消费。

// 进入demo代码目录cd /data/demos/06-all-java-demos/
// 打包mvn clean package
// 运行代码mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ACLDemo" -Dexec.classpathScope=runtime

复制代码

3. Demo 代码说明

Demo 代码可以查看 github。带 ACL 的生产者和消费者在初始化的时候,都必须给一个 hook 实例,构建方法如下:

static RPCHook getAclRPCHook(String accessKey, String secretKey) {      return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));}
复制代码

在 broker 端 secret key 用来校验信息的完整性, access key 用来校验用户权限。二者缺一不可。

用户头像

还未添加个人签名 2022.07.05 加入

还未添加个人简介

评论

发布
暂无评论
使用Java客户端发送消息和消费的应用_云计算_hum建应用专家_InfoQ写作社区