RocketMQ 是一个低延时、高可靠、可伸缩、易于使用的分布式消息中间件,是由阿里巴巴开源捐献给 Apache 的顶级项目。RocketMQ 具有高吞吐、低延迟、海量消息堆积等优点,同时提供顺序消息、事务消息、定时消息、消息重试于追踪等功能,非常适合在电商、金融等领域使用。
RocketMQ 的应用场景
RocketMQ 的应用场景如下:
- 削峰填谷:诸如秒杀、抢红包等大型活动皆会带来较高的流量脉冲,很可能因为没有做相应的保护而导致系统超负荷甚至崩溃,或因限制太过导致请求大量失败而影响用户体验,RocketMQ 可提供削峰填谷的服务来解决这些问题。 
- 异步解耦:交易系统作为淘宝、天猫主站最核心的系统,每笔交易订单数据的产生会引起几百个下游业务系统的关注,包括物流、购物车、积分、流计算分析等,整体业务系统庞大而且复杂,RocketMQ 可实现异步通信和应用解耦,确保主站业务的连续性。 
- 顺序收发:细数一下,日常需要保证顺序的应用场景非常多,例如证券交易过程中的时间优先原则,交易系统中的订单创建、支付、退款等流程,航班中的旅客登机消息处理等,与先进先出原理类似,RocketMQ 提供的顺序消息即保证消息的 FIFO。 
- 分布式事务一致性:交易系统、红包等场景需要确保数据的最终一致性,大量引入 RocketMQ 的分布式事务,即可以实现系统之间的解耦,又可以保证最终的数据一致性。 
- 大数据分析:数据在"流动"中产生价值,传统数据分析大都基于批量计算模型,无法做到实时的数据分析,利用 RocketMQ 与流式计算引擎相结合,可以很方便地实现对业务数据进行实时分析。 
- 分布式缓存同步:电商促销的时候商品需要实时感知价格的变化,大量并发访问会导致页面响应时间长,集中式缓存因为带宽瓶颈限制商品变更的访问流量,通过 RocketMQ 构建分布式缓存,可实时通知商品数据的变化。 
安装 RocketMQ(单机版)
安装 JDK
从官网下载编译好的安装包
解压压缩包
进入 bin 目录,启动 namesrv,启动 NameServer。默认情况下 NameServer 监听的端口是 9876。
tail -f ~/logs/rocketmqlogs/namesrv.log 可以查看启动日志
启动消息服务器 Broker,指定 NameServer 的 IP 地址和端口。默认情况下会加载 conf/broker.conf
输入 tail -f ~/logs/rocketmqlogs/broker.log 查看日志
如果 tail -f ~/logs/rocketmqlogs/broker.log 提示找不到文件,则打开当前目录下的 nohup.out
日志文件查看,出现如下日志表示启动失败,提示内存无法分配
内存不足的问题
这是因为 bin 目录下启动 nameserv 与 broker 的 runbroker.sh 和 runserver.sh 文件中默认分配的内
存太大,rocketmq 比较耗内存,所以默认分配的内存比较大,而系统实际内存却太小导致启动失败,
通常像虚拟机上安装的 CentOS 服务器内存可能是没有高的,只能调小。实际中应该根据服务器内存情况,配置一个合适的值。
解决办法
修改 runbroker.sh 和 runserver.sh
 JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512g"Xms 是指设定程序启动时占用内存大小。一般来讲,大点,程序会启动的快一点,但是也可能会导致机器暂时间变慢。Xmx 是指设定程序运行期间最大可占用的内存大小。如果程序运行需要占用更多的内存,超出了这个设置值,就会抛出OutOfMemory异常。xmn 年轻代的heap大小,一般设置为Xmx的3、4分之一
   复制代码
 停止服务
【sh mqshutdown broker】 //停止 brokersh
【sh mqshutdown namesrv】 //停止 nameserver
停止服务的时候需要注意,要先停止 broker,其次停止 nameserver。
broker.conf 文件
默认情况下,启动 broker 会加载 conf/broker.conf 文件,这个文件里面就是一些常规的配置信息
namesrvAddr //nameserver 地址
brokerCl usterName //Cluster 名称,如果集群机器数比较多,可以分成多个 cluster,每个 cluster 提供
给不同的业务场景使用
brokerName //broker 名称,如果配置主从模式,master 和 slave 需要配置相同的名称来表名关系
brokerId=0 //在主从模式中,一个 master broker 可以有多个 slave,0 表示 master,大于 0 表示不同
slave 的 id
brokerRole=SYNC_MASTER/ASYNC_MASTER/SLAVE ; 同步表示 slave 和 master 消息同步完成后再返回
信息给客户端
autoCreateTopicEnable = true ; topic 不存在的情况下自动创建
RocketMQ 如何发送消息
Spring Cloud Alibaba 已集成 RocketMQ,使用 Spring Cloud Stream 可以对 RocketMQ 发送和接收消息。
- 在 pom.xml 中引入 jar 包 
 <dependencies>		<dependency>			<groupId>org.springframework.boot</groupId>			<artifactId>spring-boot-starter-web</artifactId>		</dependency>		<dependency>			<groupId>com.alibaba.cloud</groupId>			<artifactId>spring-cloud-stream-binder-rocketmq</artifactId>		</dependency>
		<dependency>			<groupId>org.springframework.boot</groupId>			<artifactId>spring-boot-starter-test</artifactId>			<scope>test</scope>			<exclusions>				<exclusion>					<groupId>org.junit.vintage</groupId>					<artifactId>junit-vintage-engine</artifactId>				</exclusion>			</exclusions>		</dependency>		<dependency>			<groupId>org.springframework.cloud</groupId>			<artifactId>spring-cloud-stream-test-support</artifactId>			<scope>test</scope>		</dependency>	</dependencies>
	<dependencyManagement>		<dependencies>			<dependency>				<groupId>org.springframework.cloud</groupId>				<artifactId>spring-cloud-dependencies</artifactId>				<version>${spring-cloud.version}</version>				<type>pom</type>				<scope>import</scope>			</dependency>			<dependency>				<groupId>org.springframework.boot</groupId>				<artifactId>spring-boot-dependencies</artifactId>				<version>2.3.0.RELEASE</version>				<type>pom</type>				<scope>import</scope>			</dependency>			<dependency>				<groupId>com.alibaba.cloud</groupId>				<artifactId>spring-cloud-alibaba-dependencies</artifactId>				<version>2.1.1.RELEASE</version>				<type>pom</type>				<scope>import</scope>			</dependency>		</dependencies>	</dependencyManagement>
   复制代码
 - 配置 application.properties 
 server.port=8080spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876spring.cloud.stream.bindings.output.destination=TopicTestspring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
   复制代码
 - 使用 Binder 发送消息 
 @SpringBootApplication@EnableBinding({Source.class})public class RocketmqDemoApplication {
	public static void main(String[] args) {		SpringApplication.run(RocketmqDemoApplication.class, args);	}}
@RestControllerpublic class SendController {    @Autowired    private Source source;
    @GetMapping("/send")    public String send(String msg) {        MessageBuilder builder = MessageBuilder.withPayload(msg);        Message message = builder.build();        source.output().send(message);        return "Hello RocketMQ, Send " + msg;    }}
   复制代码
 @EnableBinding({Source.class})表示绑定配置文件中名称为 output 的消息通道 Binding,Source 类中定义的消息通道名称为 output。发送 http 请求 http://localhost:8080/send?msg=test 将消息发送到 RocketMQ 中。
实际项目中会存在多个发送消息通道,可以自定义消息通道的名称,参考 Source 类自定义一个接口,修改通道名称和相关配置即可。
 public interface OrderSource {    String OUTPUT = "orderSourcec";        @Output(OrderSource.OUTPUT)    MessageChannel output();}
@SpringBootApplication@EnableBinding({Source.class, OrderSource.class})public class RocketmqDemoApplication {
	public static void main(String[] args) {		SpringApplication.run(RocketmqDemoApplication.class, args);	}}
server.port=8080spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876spring.cloud.stream.bindings.output.destination=TopicTestspring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
spring.cloud.stream.bindings.orderOutput.destination=TopicOrderspring.cloud.stream.rocketmq.bindings.orderOutput.producer.group=order-group
   复制代码
 RocketMQ 如何消费消息
- 引入相关依赖 
 <dependencies>        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-web</artifactId>        </dependency>        <dependency>            <groupId>com.alibaba.cloud</groupId>            <artifactId>spring-cloud-stream-binder-rocketmq</artifactId>        </dependency>
        <dependency>            <groupId>org.springframework.boot</groupId>            <artifactId>spring-boot-starter-test</artifactId>            <scope>test</scope>            <exclusions>                <exclusion>                    <groupId>org.junit.vintage</groupId>                    <artifactId>junit-vintage-engine</artifactId>                </exclusion>            </exclusions>        </dependency>    </dependencies>
   复制代码
 - 配置 application.properties 
 server.port=8081spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876spring.cloud.stream.bindings.output.destination=TopicTestspring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group
   复制代码
 - 定义消息监听 
 @EnableBinding({Sink.class})@SpringBootApplicationpublic class App {    public static void main( String[] args )    {        SpringApplication.run(App.class);    }        @StreamListener(Sink.INPUT)    public void receive(String msg) {        System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis());    }}
   复制代码
 @EnableBinding({Sink.class})表示绑定配置文件中名称为 input 的消息通道 Binding,Sink 类中定义的消息通道的名称为 input,@StreamListener 表示定义一个消息监听器,接收 RocketMQ 中的消息。
实际项目中会存在多个接收消息的通道,可以自定义消息通道的名称,参考 Sink 类自定义一个接口,修改通道名称和相关配置即可。
 public interface InputChannel {
    String USER_INPUT = "userInput";    String ORDER_INPUT = "orderInput";
    @Input(InputChannel.USER_INPUT)    SubscribableChannel userInput();
    @Input(InputChannel.ORDER_INPUT)    SubscribableChannel orderInput();}
@EnableBinding({Sink.class, InputChannel.class})@SpringBootApplicationpublic class App {    public static void main( String[] args )    {        SpringApplication.run(App.class);    }
    @StreamListener(Sink.INPUT)    public void receive(String msg) {        System.out.println("TopicTest receive: " + msg + ", receiveTime= " + System.currentTimeMillis());    }
    @StreamListener(InputChannel.ORDER_INPUT)    public void receiveOrderInput(String msg) {        System.out.println(" receive: " + msg + ", receiveTime= " + System.currentTimeMillis());    }}
server.port=8081spring.cloud.stream.rocketmq.binder.name-server=192.168.0.118:9876spring.cloud.stream.bindings.input.destination=TopicTestspring.cloud.stream.rocketmq.bindings.input.producer.group=demo-group
spring.cloud.stream.bindings.orderInput.destination=TopicOrderspring.cloud.stream.rocketmq.bindings.orderInput.producer.group=order-group
   复制代码
 
评论