写点什么

spring-cloud-stream 集成 rocketmq

发布于: 2020 年 04 月 30 日
spring-cloud-stream 集成 rocketmq

Spring Cloud Stream 是什么?

它是什么

Spring Cloud Stream 是一个构建高度可扩展的事件驱动微服务的框架,与共享消息系统相连。

该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的 Spring 用法和最佳实践之上,包括支持持久化的 pub/sub 语义、消费者组和有状态分区。

绑定的一些实现

Spring Cloud Stream 支持多种绑定实现,下表包括了 GitHub 项目的链接。


Spring Cloud Stream 的核心构件是:


  • Destination Binders: 负责提供与外部消息系统集成的组件。

  • Destination Bindings: 作为消息中间件与应用程序的提供者和消费者之间的桥梁。

  • Message: 生产者和消费者用于与目的地装订器沟通的典型数据结构(从而通过外部消息系统与其他应用程序进行通信的典型数据结构)。

为什么用 Cloud Stream?

  1. 解耦。使用了 SCS 之后,我们只需要在配置文件中配置下对应的中间件服务器地址等信息,然后就可使用,使得业务中不需要出现具体的消息中间件。

  2. 便于迁移。例如项目中一开始使用的是 rabbitmq,后期要想迁移成 kafka 的话,如果使用传统方式,在使用的地方使用具体消息中间件的话,那么迁移的成本会很高,而使用 SCS 的话,只需要更改配置文件即可。

如何使用?(集成 rocket mq )

配置 JAVA_HOME 路径(以下为 mac 环境下的配置)

  1. chmod 777 /etc/profile

  2. sudo vim /etc/profile

  3. export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home

  4. source /etc/profile

安装启动 Rocket MQ

  1. 从官网 下载二进制文件

  2. 启动 nameserver nohup sh bin/mqnamesrv &

  3. 启动 broker nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &

  4. 设置 nameserver 地址 export NAMESRV_ADDR=localhost:9876

  5. 生产者发送消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

  6. 消费者消费消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

  7. 关闭 broker sh bin/mqshutdown broker

  8. 关闭 nameserver sh bin/mqshutdown namesrv


提示:

如果没有执行 export NAMESRV_ADDR=localhost:9876

会导致 java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed

集成 SCS

  1. 通过 https://start.spring.io/ 创建一个初始化项目

  2. 这里贴出 pom 文件配置

<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.2.6.RELEASE</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <groupId>org.example</groupId>    <artifactId>mq</artifactId>    <version>1.0-SNAPSHOT</version>
<properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.SR4</spring-cloud.version> </properties>
<dependencies> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <version>2.2.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
复制代码
  1. 创建 CustomerChannel

public interface CustomerChannel {
/** * 这里的名称对应了spring.cloud.stream.rocketmq.bindings.<channelName> */ String OUTPUT = "my-output"; String INPUT = "my-input";
@Output(CustomerChannel.OUTPUT) MessageChannel output();
@Input(CustomerChannel.INPUT) SubscribableChannel input();}
复制代码
  1. 定义 TestController

@RestController@EnableBinding({CustomerChannel.class})public class TestController {
private final CustomerChannel customerChannel;
public TestController(CustomerChannel customerChannel) { this.customerChannel = customerChannel; }
/** * 使用一个controller断点模拟发送消息,可以在setHeader方法中设置header来实现消息过滤 */ @PostMapping("/message-send") public String testCustomInterfaceSendMsg() { Message<String> message = MessageBuilder.withPayload("send message") .setHeader(RocketMQHeaders.TAGS, "tag2") .setHeader("mytag", "my-tag") .build();
this.customerChannel.output().send(message);
Message<String> message2 = MessageBuilder.withPayload("send message") .setHeader(RocketMQHeaders.TAGS, "tag3") .setHeader("mytag", "your-tag") .build();
this.customerChannel.output().send(message2);
return "success"; }
/** * 使用@StreamListener来监听消息 */ @StreamListener(value = CustomerChannel.INPUT, condition = "headers['mytag']=='my-tag'") public void testCustomListener(Message message) { System.out.println(message.getHeaders().get("TAGS") + " " + message.getPayload().toString()); }
/** * 使用@StreamListener来监听消息 */ @StreamListener(value = CustomerChannel.INPUT, condition = "headers['mytag']=='your-tag'") public void testCustomListenerFilter(Message message) { System.out.println(message.getHeaders().get("TAGS") + " " + message.getPayload().toString()); }}
复制代码
  1. 配置 application.yml

spring:  cloud:    stream:      rocketmq:        binder:          name-server: localhost:9876          enable-msg-trace: true        bindings:          my-input:            consumer:              tags: tag2 || tag1 || tag3 || tag4 # tag 为 tag1/tag2/tag3/tag4      bindings:        my-input:          destination: my-stream-topic # 相当于 rocketmq 的 topic          group: my-stream-group          binder: rocketmq #          consumer:            instanceCount: 1 # 指定实例数量        my-output:          destination: my-stream-topic # 相当于 rocketmq 的 topic
复制代码
  1. 运行 MQApplication,使用 POST 方法请求 localhost:8989/message-send

核心原理

消息发送和消费的流程:

  1. 消息通过 MessageChannel(output) 进行发送,AbstractMessageChannel 实现了 MessageChannelAbstractSubscribableChannel 继承了 AbstractMessageChannel 并且实现了 SubscribableChannel,重写了其中的 subscribe 方法,subscribe() 指定了 MessageHandler,最终会调用 RocketMQMessageHandler 发送消息

  2. 消息发出之后,对应的消息中间件内部会有通道适配器,将中间件特有的消息格式转换为 SpringMessage,然后发送到 MessageChannel(input)

  3. StreamListener 订阅了对应的 input ,根据一定的条件,就能收到消费者发出的消息。



本文参考

https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ-en

http://rocketmq.apache.org/docs/quick-start/

RocketMQ 和 Spring Cloud Stream


发布于: 2020 年 04 月 30 日阅读数: 193
用户头像

还未添加个人签名 2017.11.30 加入

还未添加个人简介

评论

发布
暂无评论
spring-cloud-stream 集成 rocketmq