写点什么

RocketMQ - 什么是 RocketMQ

用户头像
Java收录阁
关注
发布于: 2020 年 05 月 23 日

RocketMQ 是一个低延时、高可靠、可伸缩、易于使用的分布式消息中间件,是由阿里巴巴开源捐献给 Apache 的顶级项目。RocketMQ 具有高吞吐、低延迟、海量消息堆积等优点,同时提供顺序消息、事务消息、定时消息、消息重试于追踪等功能,非常适合在电商、金融等领域使用。

RocketMQ 的应用场景

RocketMQ 的应用场景如下:

  • 削峰填谷:诸如秒杀、抢红包等大型活动皆会带来较高的流量脉冲,很可能因为没有做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,RocketMQ 可提供削峰填谷的服务来解决这些问题。

  • 异步解耦:交易系统作为淘宝、天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等,整体业务系统庞大而且复杂,RocketMQ 可实现异步通信和应用解耦,确保主站业务的连续性。

  • 顺序收发:细数一下,日常需要保证顺序的应用场景非常多,例如证券交易过程中的时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等,与先进先出原理类似,RocketMQ 提供的顺序消息即保证消息的 FIFO。

  • 分布式事务一致性:交易系统、红包等场景需要确保数据的最终一致性,大量引入 RocketMQ 的分布式事务,即可以实现系统之间的解耦,又可以保证最终的数据一致性。

  • 大数据分析:数据在"流动"中产生价值,传统数据分析大都基于批量计算模型,无法做到实时的数据分析,利用 RocketMQ 与流式计算引擎相结合,可以很方便地实现对业务数据进行实时分析。

  • 分布式缓存同步:电商促销的时候商品需要实时感知价格的变化,大量并发访问会导致页面响应时间长,集中式缓存因为带宽瓶颈限制商品变更的访问流量,通过 RocketMQ 构建分布式缓存,可实时通知商品数据的变化。

安装 RocketMQ(单机版)

安装 JDK

从官网下载编译好的安装包

解压压缩包

进入 bin 目录,启动 namesrv,启动 NameServer。默认情况下 NameServer 监听的端口是 9876。

tail -f ~/logs/rocketmqlogs/namesrv.log 可以查看启动日志

启动消息服务器 Broker,指定 NameServer 的 IP 地址和端口。默认情况下会加载 conf/broker.conf

输入 tail -f ~/logs/rocketmqlogs/broker.log 查看日志

如果 tail -f ~/logs/rocketmqlogs/broker.log 提示找不到文件,则打开当前目录下的 nohup.out

日志文件查看,出现如下日志表示启动失败,提示内存无法分配

内存不足的问题

这是因为 bin 目录下启动 nameserv 与 broker 的 runbroker.sh 和 runserver.sh 文件中默认分配的内

存太大,rocketmq 比较耗内存,所以默认分配的内存比较大,而系统实际内存却太小导致启动失败,

通常像虚拟机上安装的 CentOS 服务器内存可能是没有高的,只能调小。实际中应该根据服务器内存情况,配置一个合适的值。

解决办法

修改 runbroker.sh 和 runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512g"Xms 是指设定程序启动时占用内存大小。一般来讲,大点,程序会启动的快一点,但是也可能会导致机器暂时间变慢。Xmx 是指设定程序运行期间最大可占用的内存大小。如果程序运行需要占用更多的内存,超出了这个设置值,就会抛出OutOfMemory异常。xmn 年轻代的heap大小,一般设置为Xmx的3、4分之一
复制代码

停止服务

【sh mqshutdown broker】 //停止 brokersh

【sh mqshutdown namesrv】 //停止 nameserver

停止服务的时候需要注意,要先停止 broker,其次停止 nameserver。

broker.conf 文件

默认情况下,启动 broker 会加载 conf/broker.conf 文件,这个文件里面就是一些常规的配置信息

namesrvAddr //nameserver 地址

brokerCl usterName //Cluster 名称,如果集群机器数比较多,可以分成多个 cluster,每个 cluster 提供

给不同的业务场景使用

brokerName //broker 名称,如果配置主从模式,master 和 slave 需要配置相同的名称来表名关系

brokerId=0 //在主从模式中,一个 master broker 可以有多个 slave,0 表示 master,大于 0 表示不同

slave 的 id

brokerRole=SYNC_MASTER/ASYNC_MASTER/SLAVE ; 同步表示 slave 和 master 消息同步完成后再返回

信息给客户端

autoCreateTopicEnable = true ; topic 不存在的情况下自动创建

RocketMQ 如何发送消息

Spring Cloud Alibaba 已集成 RocketMQ,使用 Spring Cloud Stream 可以对 RocketMQ 发送和接收消息。

  1. 在 pom.xml 中引入 jar 包

<dependencies>		<dependency>			<groupId>org.springframework.boot</groupId>			<artifactId>spring-boot-starter-web</artifactId>		</dependency>		<dependency>			<groupId>com.alibaba.cloud</groupId>			<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>		</dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> </dependencies>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.3.0.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.1.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
复制代码
  1. 配置 application.properties

server.port=8080spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876spring.cloud.stream.bindings.output.destination=TopicTestspring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
复制代码
  1. 使用 Binder 发送消息

@SpringBootApplication@EnableBinding({Source.class})public class RocketmqDemoApplication {
public static void main(String[] args) { SpringApplication.run(RocketmqDemoApplication.class, args); }}
@RestControllerpublic class SendController { @Autowired private Source source;
@GetMapping("/send") public String send(String msg) { MessageBuilder builder = MessageBuilder.withPayload(msg); Message message = builder.build(); source.output().send(message); return "Hello RocketMQ, Send " + msg; }}
复制代码

@EnableBinding({Source.class})表示绑定配置文件中名称为 output 的消息通道 Binding,Source 类中定义的消息通道名称为 output。发送 http 请求 http://localhost:8080/send?msg=test 将消息发送到 RocketMQ 中。

实际项目中会存在多个发送消息通道,可以自定义消息通道的名称,参考 Source 类自定义一个接口,修改通道名称和相关配置即可。

public interface OrderSource {    String OUTPUT = "orderSourcec";        @Output(OrderSource.OUTPUT)    MessageChannel output();}
@SpringBootApplication@EnableBinding({Source.class, OrderSource.class})public class RocketmqDemoApplication {
public static void main(String[] args) { SpringApplication.run(RocketmqDemoApplication.class, args); }}
server.port=8080spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876spring.cloud.stream.bindings.output.destination=TopicTestspring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
spring.cloud.stream.bindings.orderOutput.destination=TopicOrderspring.cloud.stream.rocketmq.bindings.orderOutput.producer.group=order-group
复制代码

RocketMQ 如何消费消息

  1. 引入相关依赖

<dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>com.alibaba.cloud</groupId>            <artifactId>spring-cloud-stream-binder-rocketmq</artifactId>        </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
复制代码
  1. 配置 application.properties

server.port=8081spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876spring.cloud.stream.bindings.output.destination=TopicTestspring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
复制代码
  1. 定义消息监听

@EnableBinding({Sink.class})@SpringBootApplicationpublic class App {    public static void main( String[] args )    {        SpringApplication.run(App.class);    }        @StreamListener(Sink.INPUT)    public void receive(String msg) {        System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis());    }}
复制代码

@EnableBinding({Sink.class})表示绑定配置文件中名称为 input 的消息通道 Binding,Sink 类中定义的消息通道的名称为 input,@StreamListener 表示定义一个消息监听器,接收 RocketMQ 中的消息。

实际项目中会存在多个接收消息的通道,可以自定义消息通道的名称,参考 Sink 类自定义一个接口,修改通道名称和相关配置即可。

public interface InputChannel {
String USER_INPUT = "userInput"; String ORDER_INPUT = "orderInput";
@Input(InputChannel.USER_INPUT) SubscribableChannel userInput();
@Input(InputChannel.ORDER_INPUT) SubscribableChannel orderInput();}
@EnableBinding({Sink.class, InputChannel.class})@SpringBootApplicationpublic class App { public static void main( String[] args ) { SpringApplication.run(App.class); }
@StreamListener(Sink.INPUT) public void receive(String msg) { System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis()); }
@StreamListener(InputChannel.ORDER_INPUT) public void receiveOrderInput(String msg) { System.out.println(" receive: " + msg + ", receiveTime= " + System.currentTimeMillis()); }}
server.port=8081spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876spring.cloud.stream.bindings.input.destination=TopicTestspring.cloud.stream.rocketmq.bindings.input.producer.group=demo-group
spring.cloud.stream.bindings.orderInput.destination=TopicOrderspring.cloud.stream.rocketmq.bindings.orderInput.producer.group=order-group
复制代码


发布于: 2020 年 05 月 23 日阅读数: 131
用户头像

Java收录阁

关注

士不可以不弘毅,任重而道远 2020.04.30 加入

喜欢收集整理Java相关技术文档的程序员,欢迎关注同名微信公众号 Java收录 阁获取更多文章

评论

发布
暂无评论
RocketMQ - 什么是RocketMQ