写点什么

大家心心念念的 RocketMQ5.x 入门手册来喽

  • 2023-02-12
    上海
  • 本文字数:3590 字

    阅读完需:约 12 分钟

大家心心念念的RocketMQ5.x入门手册来喽

大家好,我是威哥,RocketMQ 官方社区优秀布道师、QCon 研习社讲师、RocketMQ 全球开发者峰会讲师、CSDN2020 博客之星 TOP2,极客时间《中间件核心技术与实战》专栏作者、 《RocketMQ 技术内幕》、《RocketMQ 实战》两本图书的第一作者,热衷于中间件领域的技术研究与分享,维护「中间件兴趣圈」公众号,陆续发表 350+原创文章。目前担任中通快递技术平台部资深架构师,主要负责消息、缓存、数据同步、搜索、网关等中间件产品的研发与落地,拥有千亿级消息集群的运维经验。

1、前言

为了更好的拥抱云原生,RocketMQ5.x 架构进行了大的重构,提出了存储与计算分离的设计架构,架构设计图如下所示:

00

RocketMQ5.x 提供了一套非常建议的消息发送、消费 API,并统一放在 Apache 顶级开源项目 rocketmq-clients 下,链接:https://github.com/apache/rocketmq-clients,提供了 cpp、go、java、php、rust 的实现,多语言生态初现,如下图所示:

01

2、源码级调试 RocketMQ 5.x

当 RocketMQ 为了顺应云原生大潮,提出存储与计算分离后,想必我相信很多粉丝朋友和我一样,都希望尽快一睹 RocketMQ5.x 的”芳颜“,如果还没有在 IDE 中调试通过的小伙伴,那就跟着我的步骤来,带你一起体验 RocketMQ 5.x。

Step1:从 github(https://github.com/apache/rocketmq)下载源码,并导入到 IDEA 中,如下图所示:

02

相比 RocketMQ4.x,5.x 主要是增加了一个代理模块(rocketmq-proxy),将路由、计算等功能从 Broker 中剥离出来。

Step2:创建一个 RocketMQ 主目录,并在主目录中创建 conf 文件夹,并把源码中 distribution 模块中 conf 下的文件拷贝到当前目录,如下图所示:

03

Step3:从 namesrv 模块中找到类 NamesrvStartup 类,配置后运行,如下图所示:

04

这里的关键点在于需要配置环境变量 ROCKETMQ_HOME,其路径设置为【Step2】中创建的目录,然后启动该类,输出如下所示表示 NameServer 启动成功。

The Name Server boot success. serializeType=JSON
复制代码

Step4:从 broker 模块中找到类 BrokerStartup,配置后运行,效果如下图所示:

05

这里有两个要点:

  • 通过 -c 参数指定 broker 配置文件的位置

  • 设置 ROCKETMQ_HOME 环境变量,其路径就是上文中 conf 目录所在的父目录

Step5:启动 proxy 模块,如下图所示:

06

设置好环境变量 RMQ_PROXY_HOME 环境变量,直接启动,会抛出如下错误:

07

原因是 RocketMQ Proxy 在启动时会 RMQ_PROXY_HOME 加载日志文件,我们从源码模块中 distribution 中 logback_proxy.xml 拷贝到 proxy 主目录的 conf 文件夹下。

再次尝试启动,抛出如下错误:

08

需要再从源码模块中 distribution 中 rmq-proxy.json 拷贝到 proxy 主目录的 conf 文件夹下,启动成功如下所示:

09

那问题来了,rmq-proxy.json 文件中的内容是多少呢?

{  "rocketMQClusterName": "DefaultCluster"}
复制代码

那这个文件中又可以陪着哪些参数呢?这个目前无法从官方网站中获取,大家可以去查看 org.apache.rocketmq.proxy.config.ProxyConfig,里面所有的属性都可以在这个文件中配置。

Nameserver、broker、Proxy 都已经启动成功了,那我们如何发送消息呢?

由于 RocketMQ 5.x 引入了 Proxy,原先的 RocketMQ Client API 不能直接使用,RocketMQ 官方提供了一套极简 API,API 的完整定义在 Apache 顶级开源项目 rocketmq-apis(https://github.com/apache/rocketmq-apis),具体的定义如下图所示:

10

具体的实现在https://github.com/apache/rocketmq-clients,实现了 cpp、golang、java、php、rust 的实现。

接下来,我们使用一下 java 版本的客户端尝试发送一条消息,代码如下所示:

<dependency>      <groupId>org.apache.rocketmq</groupId>      <artifactId>rocketmq-client-apis</artifactId>      <version>5.0.0</version>    </dependency>
    <dependency>      <groupId>org.apache.rocketmq</groupId>      <artifactId>rocketmq-client-java</artifactId>      <version>5.0.0</version>    </dependency>  import org.apache.rocketmq.client.apis.ClientConfiguration;import org.apache.rocketmq.client.apis.ClientServiceProvider;import org.apache.rocketmq.client.apis.SessionCredentialsProvider;import org.apache.rocketmq.client.apis.StaticSessionCredentialsProvider;import org.apache.rocketmq.client.apis.message.Message;import org.apache.rocketmq.client.apis.producer.Producer;import org.apache.rocketmq.client.apis.producer.SendReceipt;
import java.nio.charset.StandardCharsets;import java.time.Duration;import java.util.concurrent.CompletableFuture;
public class RocketMQProxyTest {
    public static void main(String[] args) throws Exception {

        final ClientServiceProvider provider = ClientServiceProvider.loadService();
        // Credential provider is optional for client configuration.        String accessKey = "yourAccessKey";        String secretKey = "yourSecretKey";        SessionCredentialsProvider sessionCredentialsProvider =                new StaticSessionCredentialsProvider(accessKey, secretKey);
        String endpoints = "127.0.0.1:8081";        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()                .setEndpoints(endpoints)                .setCredentialProvider(sessionCredentialsProvider)                .setRequestTimeout(Duration.ofSeconds(30))                .build();        String topic = "TopicTest";        final Producer producer = provider.newProducerBuilder()                .setClientConfiguration(clientConfiguration)                // Set the topic name(s), which is optional. It makes producer could prefetch the topic route before                // message publishing.                .setTopics(topic)                // May throw {@link ClientException} if the producer is not initialized.                .build();        // Define your message body.        byte[] body = "This is a normal message for Apache RocketMQ".getBytes(StandardCharsets.UTF_8);        String tag = "yourMessageTagA";

        final Message message = provider.newMessageBuilder()                // Set topic for the current message.                .setTopic(topic)                // Message secondary classifier of message besides topic.                .setTag(tag)                // Key(s) of the message, another way to mark message besides message id.                .setKeys("yourMessageKey-0e094a5f9d85")                .setBody(body)                .build();        final CompletableFuture<SendReceipt> future = producer.sendAsync(message);        future.whenComplete((sendReceipt, throwable) -> {            if (null == throwable) {                System.out.println("Send message successfully, messageId=" + sendReceipt.getMessageId());            } else {                System.out.println("Failed to send message");            }        });        // Block to avoid exist of background threads.        Thread.sleep(Long.MAX_VALUE);        // Close the producer when you don't need it anymore.        producer.close();    }}
复制代码

运行结果:

Send message successfully, messageId=01C6A0F34F62CB328C03EFF3EF00000000
复制代码

运行成功,在这里给大家留一个作业,那消息消费如何写呢?如有疑问,可以加我微信 dingwpmz,我们一起交流。


文章首发:https://www.codingw.net/Article?id=783

发布于: 刚刚阅读数: 4
用户头像

『中间件兴趣圈』《RocketMQ技术内幕》 2020-11-30 加入

《RocketMQ技术内幕》作者、RocketMQ社区优秀布道师、中通科技技术平台部资深架构师、专注于JAVA中间件领域的源码分析、原理与实战。

评论

发布
暂无评论
大家心心念念的RocketMQ5.x入门手册来喽_RocketMQ_中间件兴趣圈_InfoQ写作社区