写点什么

快速入门一篇搞定 RocketMq- 实现微服务实战落地

  • 2024-05-06
    福建
  • 本文字数:4879 字

    阅读完需:约 16 分钟

1、RocketMq 介绍


RocketMQ 起源于阿里巴巴,最初是为了解决邮件系统的高可靠性和高性能而设计的。在 2016 年开源分布式消息中间件,并逐渐成为 Apache 顶级项目。现在是 Apache 的一个顶级项目,在阿里内部使用非常广泛,已经经过了"双 11"这种万亿级的消息流转,性能稳定、高效。


官网地址:https://rocketmq.apache.org

快速开始文档:https://rocketmq.apache.org/docs/

Github 地址:https://github.com/apache/rocketmq


2、RocketMq 架构说明


RocketMQ 的架构主要由 Producer(消息生产者)、Consumer(消息消费者)、Broker(消息中转角色)和 Name Server(网络路由角色)四个核心组件组成。Name Server 负责维护 Broker 集群和 Topic 信息的路由中心,而 Broker 负责存储和传输消息。RocketMQ 采用类似于 Kafka 的发布订阅模型,支持消息的顺序传输和事务性传输,同时可以配置不同的消息过滤规则和重试策略。


3、下载


查看微服务对应版本信息,下载相关版本。查看连接:https://github.com/alibaba/spring-cloud-alibaba/wiki/版本说明#2021x-分支



根据自己使用的 Spring Cloud Alibaba Version 选择对应的版本进行下载即可。这里下载 4.4.0 版本,下载地址:https://rocketmq.apache.org/download 下载成功后,为一个压缩包文件。把文件上传 linux



使用命令解压 zip 文件并重命名文件夹命令:

unzip rocketmq-all-4.4.0-bin-release.zip -d rocketmq-4.4.0
复制代码


解压成功后,如图:



4、启动


进入 rocketmq-4.4.0 目录,查看目录结构。



  • benchmark:性能测试相关的资源,如果想要了解 RocketMQ 的基准测试,可以考虑使用该压测工具。这个工具可以模拟生产者和消费者来测试 RocketMQ 集群的性能。

  • bin:里面是一些可执行文件,管理 rocketmq 服务

  • conf:里面就是一些配置文件,包括 broker 配置文件和 logback 配置文件

  • lib:所依赖的第三方 jar 包


4.1、启动 Name Server 命令


nohup sh bin/mqnamesrv -n 192.168.42.130:9876 > /dev/null 2>&1 &   # -n 后面IP为公网IP 必须指定其公网IP,不然会连接失败
复制代码


启动成功后,默认启动日志在 root 目录下。可以查看启动日志信息:

tail -f ~/logs/rocketmqlogs/namesrv.log
复制代码


输出下面信息启动成功:



也可以通过端口 9876 查看是否启动成功

ps -ef|grep 9876
复制代码



4.2 启动 Broker 命令
nohup sh bin/mqbroker -n 192.168.42.130:9876 -c conf/broker.conf autoCreateTopicEnable=true >/dev/null 2>&1 & # -n 后面IP为公网IP 必须指定其公网IP,不然会连接失败
复制代码


启动日志和启动 Name Server 日志在一个文件夹里面。查看启动日志信息:

tail -n 50 ~/logs/rocketmqlogs/broker.log
复制代码



可以通过 jps 查看启动信息如果能看到 NamesrvStartup 和 BrokerStartup 的话就表明单机版的 RocketMQ 启动成功了



4.3 Rocketmq 服务关闭


关闭 MQ 使用 bin 目录下的 mqshutdown 关闭服务

sh bin/mqshutdown namesrv #关闭namesrv服务
sh bin/mqshutdown broker #关闭broker服务
复制代码


4.4 启动脚本命令参数修改


在启动的过程中,如果服务器内存不足或者满足不了启动脚本里面的默认内存配置,启动的时候会启动报错。这是因为 apache-rocketmq/bin 目录下启动 nameserv 与 broker 的 runbroker.sh 和 runserver.sh 文件中默认分配的内存太大,而系统实际内存却太小导致启动失败。解决办法就是修改 runbroker.sh 和 runserver.sh 里的内存配置,调小一些即可。


首先先备份一份 runbroker.sh 和 runserver.sh 文件,以防万一改错了。

cp runserver.sh runserver.sh.init
cp runbroker.sh runbroker.sh.init
复制代码


修改:runserver.sh 脚本文件,找到配置 JVM 参数的内容,把 JVM 配置参数调小:

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn64m -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=160m"
复制代码



修改:runbroker.sh 脚本文件

JAVA_OPT="${JAVA_OPT} -server -Xms128m -Xmx128m -Xmn64m"
复制代码



5、测试消息


通过上面的步骤,RocketMQ 就启动成功了。接下来我们可以在服务器上面通过提供的测试脚本进行消息测试,验证 RocketMq 是否可以正常使用。


生产者发送消息:

export NAMESRV_ADDR=127.0.0.1:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
复制代码



通过输出内容,我们可以查看到消息发送成功了。下面运行监听脚本。测试消费者接受消息:

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
复制代码



成功拿到消息,可以说明 RocketMq 服务启动成功了。


6、监控程序 rocketmq-console


6.1、配置 rocketmq-console


rocketmq-externals 是 RocketMq 的扩展插件项目。GitHub 地址: https://github.com/apache/rocketmq-externals 之前 rocketmq-console 也在 rocketmq-externals 项目中。如今在 GitHub apache/rocketmq-externals 项目下已经找不到 rocketmq-console 模块了,官方已经从 apache/rocketmq-externals 独立出来并更名为 rocketmq-dashboard。 我们可以查看 RocketMq 官网配置仪表板说明 :RocketMQ 仪表板 |MQ (apache.org)

https://rocketmq.apache.org/docs/deploymentOperations/04Dashboard/
复制代码


根据提示可以下载到源码内容



Github 下载地址:https://github.com/apache/rocketmq-dashboard


如果是 5.0 版本的直接拉取最新的代码

 git clone https://github.com/apache/rocketmq-dashboard.git  
复制代码


releases 标签中的 rocketmq-dashboard-1.0.0 版本试用于 5.0 版本以下的。

https://github.com/apache/rocketmq-dashboard/releases/tag/rocketmq-dashboard-1.0.0
复制代码


下载成功后,使用 IDEA 打开修改配置,改一下 namesrvAddr 配置项即可,如果没有指定默认就是 localhost:9876,如果 namesrvAddr 是集群环境,每个节点使用;隔开。本地测试运行,运行成功后打包发布的 linux 系统。

mvn clean package -Dmaven.test.skip=true #跳过测试
复制代码


6.2 启动 rocketmq-console


指定 NameServer 的地址和启动端口(8830)以及输出日志。由于内部不够,设置 JVM 参数启动,如果使用的 linux 系统内存足够可以忽略 jvm 参数。启动命令如下:

nohup java -jar -Xmx256M -Xms256M -XX:MaxMetaspaceSize=128M -XX:MetaspaceSize=128M rocketmq-dashboard-1.0.0.jar --server.port=8830 --rocketmq.config.namesrvAddr=127.0.0.1:9876 > /dev/null 2>&1 &
复制代码


不指定 JVM 参数:

nohup java -jar  rocketmq-dashboard-1.0.0.jar --server.port=8830 --rocketmq.config.namesrvAddr=127.0.0.1:9876 > /dev/null 2>&1 &
复制代码


执行成功后,查看启动日志:

 tail -f ~/logs/consolelogs/rocketmq-console.log 
复制代码



启动成功。开放 8830 端口进行公网访问。



监控成功。可以在集群导航中查看当前节点部署节点。



也可以看到上面测试的数据输出:



7、微服务连接 RockerMq


安全组需要开放 10909、10911 端口和 9876 端口,其中 10909 是 VIP 通道,10911 是非 VIP 通道,9876 是对外连接提供端口。不然连接发送会报错发送超时 sendDefaultImpl call timeout; nested exception is org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout

maven 引入依赖

        <!--RocketMQ-->        <dependency>            <groupId>org.apache.rocketmq</groupId>            <artifactId>rocketmq-spring-boot-starter</artifactId>            <version>2.2.1</version>        </dependency>
复制代码


配置中心加入 RocketMq 配置

rocketmq: # rocketMQ配置  # name server地址  name-server: 192.168.42.130:9876  consumer:    pull-batch-size: 10    group: blog_message  producer:    group: blog_message    # 发送消息超时时间,默认3000    sendMessageTimeout: 10000    # 发送消息失败重试次数,默认2    retryTimesWhenSendFailed: 2    # 异步消息重试此处,默认2    retryTimesWhenSendAsyncFailed: 2    # 消息最大长度,默认1024 * 1024 * 4(默认4M)    maxMessageSize: 4096    # 压缩消息阈值,默认4k(1024 * 4)    compressMessageBodyThreshold: 4096    # 是否在内部发送失败时重试另一个broker,默认false    retryNextServer: false
复制代码


编写 RocketEnhanceConfig 文件,解决不支持 Java 时间类型配置

@Configurationpublic class RocketEnhanceConfig {
/** * 解决RocketMQ Jackson不支持Java时间类型配置 * 源码参考:{@link org.apache.rocketmq.spring.autoconfigure} */ @Bean @Primary public RocketMQMessageConverter enhanceRocketMQMessageConverter(){ RocketMQMessageConverter converter = new RocketMQMessageConverter(); CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter(); List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters(); for (MessageConverter messageConverter : messageConverterList) { if(messageConverter instanceof MappingJackson2MessageConverter){ MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter; ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper(); objectMapper.registerModules(new JavaTimeModule()); } } return converter; }}
复制代码


7.1 编写消息生产者:
@Slf4j@Servicepublic class RocketStorage implements IDataStorage {  		@Autowired(required = true)    private RocketMQTemplate rocketMQTemplate;
@Value("${rocketMq.topic:blog_notify_sow}") private String topic;
@Override public void store(String value, Integer type, Long timestamp) { String message = String.format("%s,%s,%s",value,type,timestamp); rocketMQTemplate.convertAndSend(topic,message); //发送数据 log.info("RocketMQ|data sent,value: {}, type:{}, timestamp: {}", value, type, timestamp); }
@Override public String getType() { return "RocketMQ"; }}
复制代码


编写接口:IDataStorage

/** * 数据发送到Mq里... */public interface IDataStorage {
/** * persistence data * * @param value 接收内容 * @param type 数据类型 * @param timestamp 当前时间(时间戳) */ void store(String value,Integer type,Long timestamp);
String getType();
}
复制代码


在 Controller 中调用接口发送数据。

@RestController@RequestMapping("/dataStorage")public class DataStorageController {
@Autowired private IDataStorage dataStorage;
@GetMapping public Response sendDataStorage(String value){ dataStorage.store(value,type,System.currentTimeMillis()); return Response.success(); }
}
复制代码


7.2 编写消息消费者


编写一个 RocketMq 消息监听类实现消息监听 RocketDataConsumer :

@Service@Slf4j@RocketMQMessageListener(consumerGroup = "blog_message",topic = "blog_notify_sow")public class RocketDataConsumer implements RocketMQListener {
@PostConstruct public void post() { log.warn("***** RocketMq Data Consumer Activated"); }
@Autowired @Qualifier("dataPersist") private IDataPersist dataPersist;

@Override public void onMessage(Object o) { log.info("RocketMq 接收到的信息 . . . . . .:{}",o); dataPersist.put(o.toString(),1,System.currentTimeMillis()); }}
复制代码


7.3 测试消息发送和接收


启动项目,通过 postman 调用接口:



调用接口后,发现接口调用成功了。下面我们查看控制台消息消费者是否接收到消息。



通过上面输出的消息可以看到消息接收成功了。


文章转载自:sowler

原文链接:https://www.cnblogs.com/sowler/p/18173752

体验地址:http://www.jnpfsoft.com/?from=infoq

用户头像

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
快速入门一篇搞定RocketMq-实现微服务实战落地_不在线第一只蜗牛_InfoQ写作社区