RocketMQ 是一个低延时、高可靠、可伸缩、易于使用的分布式消息中间件,是由阿里巴巴开源捐献给 Apache 的顶级项目。RocketMQ 具有高吞吐、低延迟、海量消息堆积等优点,同时提供顺序消息、事务消息、定时消息、消息重试于追踪等功能,非常适合在电商、金融等领域使用。
RocketMQ 的应用场景
RocketMQ 的应用场景如下:
削峰填谷:诸如秒杀、抢红包等大型活动皆会带来较高的流量脉冲,很可能因为没有做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,RocketMQ 可提供削峰填谷的服务来解决这些问题。
异步解耦:交易系统作为淘宝、天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等,整体业务系统庞大而且复杂,RocketMQ 可实现异步通信和应用解耦,确保主站业务的连续性。
顺序收发:细数一下,日常需要保证顺序的应用场景非常多,例如证券交易过程中的时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等,与先进先出原理类似,RocketMQ 提供的顺序消息即保证消息的 FIFO。
分布式事务一致性:交易系统、红包等场景需要确保数据的最终一致性,大量引入 RocketMQ 的分布式事务,即可以实现系统之间的解耦,又可以保证最终的数据一致性。
大数据分析:数据在"流动"中产生价值,传统数据分析大都基于批量计算模型,无法做到实时的数据分析,利用 RocketMQ 与流式计算引擎相结合,可以很方便地实现对业务数据进行实时分析。
分布式缓存同步:电商促销的时候商品需要实时感知价格的变化,大量并发访问会导致页面响应时间长,集中式缓存因为带宽瓶颈限制商品变更的访问流量,通过 RocketMQ 构建分布式缓存,可实时通知商品数据的变化。
安装 RocketMQ(单机版)
安装 JDK
从官网下载编译好的安装包
解压压缩包
进入 bin 目录,启动 namesrv,启动 NameServer。默认情况下 NameServer 监听的端口是 9876。
tail -f ~/logs/rocketmqlogs/namesrv.log 可以查看启动日志
启动消息服务器 Broker,指定 NameServer 的 IP 地址和端口。默认情况下会加载 conf/broker.conf
输入 tail -f ~/logs/rocketmqlogs/broker.log 查看日志
如果 tail -f ~/logs/rocketmqlogs/broker.log 提示找不到文件,则打开当前目录下的 nohup.out
日志文件查看,出现如下日志表示启动失败,提示内存无法分配
内存不足的问题
这是因为 bin 目录下启动 nameserv 与 broker 的 runbroker.sh 和 runserver.sh 文件中默认分配的内
存太大,rocketmq 比较耗内存,所以默认分配的内存比较大,而系统实际内存却太小导致启动失败,
通常像虚拟机上安装的 CentOS 服务器内存可能是没有高的,只能调小。实际中应该根据服务器内存情况,配置一个合适的值。
解决办法
修改 runbroker.sh 和 runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512g"
Xms 是指设定程序启动时占用内存大小。一般来讲,大点,程序会启动的快一点,但是也可能会导致机器暂时
间变慢。
Xmx 是指设定程序运行期间最大可占用的内存大小。如果程序运行需要占用更多的内存,超出了这个设置值,
就会抛出OutOfMemory异常。
xmn 年轻代的heap大小,一般设置为Xmx的3、4分之一
复制代码
停止服务
【sh mqshutdown broker】 //停止 brokersh
【sh mqshutdown namesrv】 //停止 nameserver
停止服务的时候需要注意,要先停止 broker,其次停止 nameserver。
broker.conf 文件
默认情况下,启动 broker 会加载 conf/broker.conf 文件,这个文件里面就是一些常规的配置信息
namesrvAddr //nameserver 地址
brokerCl usterName //Cluster 名称,如果集群机器数比较多,可以分成多个 cluster,每个 cluster 提供
给不同的业务场景使用
brokerName //broker 名称,如果配置主从模式,master 和 slave 需要配置相同的名称来表名关系
brokerId=0 //在主从模式中,一个 master broker 可以有多个 slave,0 表示 master,大于 0 表示不同
slave 的 id
brokerRole=SYNC_MASTER/ASYNC_MASTER/SLAVE ; 同步表示 slave 和 master 消息同步完成后再返回
信息给客户端
autoCreateTopicEnable = true ; topic 不存在的情况下自动创建
RocketMQ 如何发送消息
Spring Cloud Alibaba 已集成 RocketMQ,使用 Spring Cloud Stream 可以对 RocketMQ 发送和接收消息。
在 pom.xml 中引入 jar 包
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>2.3.0.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>2.1.1.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
复制代码
配置 application.properties
server.port=8080
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
复制代码
使用 Binder 发送消息
@SpringBootApplication
@EnableBinding({Source.class})
public class RocketmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqDemoApplication.class, args);
}
}
@RestController
public class SendController {
@Autowired
private Source source;
@GetMapping("/send")
public String send(String msg) {
MessageBuilder builder = MessageBuilder.withPayload(msg);
Message message = builder.build();
source.output().send(message);
return "Hello RocketMQ, Send " + msg;
}
}
复制代码
@EnableBinding({Source.class})表示绑定配置文件中名称为 output 的消息通道 Binding,Source 类中定义的消息通道名称为 output。发送 http 请求 http://localhost:8080/send?msg=test 将消息发送到 RocketMQ 中。
实际项目中会存在多个发送消息通道,可以自定义消息通道的名称,参考 Source 类自定义一个接口,修改通道名称和相关配置即可。
public interface OrderSource {
String OUTPUT = "orderSourcec";
@Output(OrderSource.OUTPUT)
MessageChannel output();
}
@SpringBootApplication
@EnableBinding({Source.class, OrderSource.class})
public class RocketmqDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RocketmqDemoApplication.class, args);
}
}
server.port=8080
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
spring.cloud.stream.bindings.orderOutput.destination=TopicOrder
spring.cloud.stream.rocketmq.bindings.orderOutput.producer.group=order-group
复制代码
RocketMQ 如何消费消息
引入相关依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
复制代码
配置 application.properties
server.port=8081
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.output.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
复制代码
定义消息监听
@EnableBinding({Sink.class})
@SpringBootApplication
public class App
{
public static void main( String[] args )
{
SpringApplication.run(App.class);
}
@StreamListener(Sink.INPUT)
public void receive(String msg) {
System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis());
}
}
复制代码
@EnableBinding({Sink.class})表示绑定配置文件中名称为 input 的消息通道 Binding,Sink 类中定义的消息通道的名称为 input,@StreamListener 表示定义一个消息监听器,接收 RocketMQ 中的消息。
实际项目中会存在多个接收消息的通道,可以自定义消息通道的名称,参考 Sink 类自定义一个接口,修改通道名称和相关配置即可。
public interface InputChannel {
String USER_INPUT = "userInput";
String ORDER_INPUT = "orderInput";
@Input(InputChannel.USER_INPUT)
SubscribableChannel userInput();
@Input(InputChannel.ORDER_INPUT)
SubscribableChannel orderInput();
}
@EnableBinding({Sink.class, InputChannel.class})
@SpringBootApplication
public class App
{
public static void main( String[] args )
{
SpringApplication.run(App.class);
}
@StreamListener(Sink.INPUT)
public void receive(String msg) {
System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis());
}
@StreamListener(InputChannel.ORDER_INPUT)
public void receiveOrderInput(String msg) {
System.out.println(" receive: " + msg + ", receiveTime= " + System.currentTimeMillis());
}
}
server.port=8081
spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876
spring.cloud.stream.bindings.input.destination=TopicTest
spring.cloud.stream.rocketmq.bindings.input.producer.group=demo-group
spring.cloud.stream.bindings.orderInput.destination=TopicOrder
spring.cloud.stream.rocketmq.bindings.orderInput.producer.group=order-group
复制代码
评论