Spring Cloud Stream 是什么?
它是什么
Spring Cloud Stream 是一个构建高度可扩展的事件驱动微服务的框架,与共享消息系统相连。
该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的 Spring 用法和最佳实践之上,包括支持持久化的 pub/sub 语义、消费者组和有状态分区。
绑定的一些实现
Spring Cloud Stream 支持多种绑定实现,下表包括了 GitHub 项目的链接。
Spring Cloud Stream 的核心构件是:
Destination Binders: 负责提供与外部消息系统集成的组件。
Destination Bindings: 作为消息中间件与应用程序的提供者和消费者之间的桥梁。
Message: 生产者和消费者用于与目的地装订器沟通的典型数据结构(从而通过外部消息系统与其他应用程序进行通信的典型数据结构)。
为什么用 Cloud Stream?
解耦。使用了 SCS 之后,我们只需要在配置文件中配置下对应的中间件服务器地址等信息,然后就可使用,使得业务中不需要出现具体的消息中间件。
便于迁移。例如项目中一开始使用的是 rabbitmq,后期要想迁移成 kafka 的话,如果使用传统方式,在使用的地方使用具体消息中间件的话,那么迁移的成本会很高,而使用 SCS 的话,只需要更改配置文件即可。
如何使用?(集成 rocket mq )
配置 JAVA_HOME 路径(以下为 mac 环境下的配置)
chmod 777 /etc/profile
sudo vim /etc/profile
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home
source /etc/profile
安装启动 Rocket MQ
从官网 下载二进制文件
启动 nameserver nohup sh bin/mqnamesrv &
启动 broker nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
设置 nameserver 地址 export NAMESRV_ADDR=localhost:9876
生产者发送消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
消费者消费消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭 broker sh bin/mqshutdown broker
关闭 nameserver sh bin/mqshutdown namesrv
提示:
如果没有执行 export NAMESRV_ADDR=localhost:9876
会导致 java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
集成 SCS
通过 https://start.spring.io/ 创建一个初始化项目
这里贴出 pom 文件配置
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>org.example</groupId> <artifactId>mq</artifactId> <version>1.0-SNAPSHOT</version>
<properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.SR4</spring-cloud.version> </properties>
<dependencies> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId> <version>2.2.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency> </dependencies>
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build></project>
复制代码
创建 CustomerChannel
public interface CustomerChannel {
/** * 这里的名称对应了spring.cloud.stream.rocketmq.bindings.<channelName> */ String OUTPUT = "my-output"; String INPUT = "my-input";
@Output(CustomerChannel.OUTPUT) MessageChannel output();
@Input(CustomerChannel.INPUT) SubscribableChannel input();}
复制代码
定义 TestController
@RestController@EnableBinding({CustomerChannel.class})public class TestController {
private final CustomerChannel customerChannel;
public TestController(CustomerChannel customerChannel) { this.customerChannel = customerChannel; }
/** * 使用一个controller断点模拟发送消息,可以在setHeader方法中设置header来实现消息过滤 */ @PostMapping("/message-send") public String testCustomInterfaceSendMsg() { Message<String> message = MessageBuilder.withPayload("send message") .setHeader(RocketMQHeaders.TAGS, "tag2") .setHeader("mytag", "my-tag") .build();
this.customerChannel.output().send(message);
Message<String> message2 = MessageBuilder.withPayload("send message") .setHeader(RocketMQHeaders.TAGS, "tag3") .setHeader("mytag", "your-tag") .build();
this.customerChannel.output().send(message2);
return "success"; }
/** * 使用@StreamListener来监听消息 */ @StreamListener(value = CustomerChannel.INPUT, condition = "headers['mytag']=='my-tag'") public void testCustomListener(Message message) { System.out.println(message.getHeaders().get("TAGS") + " " + message.getPayload().toString()); }
/** * 使用@StreamListener来监听消息 */ @StreamListener(value = CustomerChannel.INPUT, condition = "headers['mytag']=='your-tag'") public void testCustomListenerFilter(Message message) { System.out.println(message.getHeaders().get("TAGS") + " " + message.getPayload().toString()); }}
复制代码
配置 application.yml
spring: cloud: stream: rocketmq: binder: name-server: localhost:9876 enable-msg-trace: true bindings: my-input: consumer: tags: tag2 || tag1 || tag3 || tag4 # tag 为 tag1/tag2/tag3/tag4 bindings: my-input: destination: my-stream-topic # 相当于 rocketmq 的 topic group: my-stream-group binder: rocketmq # consumer: instanceCount: 1 # 指定实例数量 my-output: destination: my-stream-topic # 相当于 rocketmq 的 topic
复制代码
运行 MQApplication,使用 POST 方法请求 localhost:8989/message-send
核心原理
消息发送和消费的流程:
消息通过 MessageChannel(output) 进行发送,AbstractMessageChannel 实现了 MessageChannel, AbstractSubscribableChannel 继承了 AbstractMessageChannel 并且实现了 SubscribableChannel,重写了其中的 subscribe 方法,subscribe() 指定了 MessageHandler,最终会调用 RocketMQMessageHandler 发送消息
消息发出之后,对应的消息中间件内部会有通道适配器,将中间件特有的消息格式转换为 SpringMessage,然后发送到 MessageChannel(input)
StreamListener 订阅了对应的 input ,根据一定的条件,就能收到消费者发出的消息。
本文参考
https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ-en
http://rocketmq.apache.org/docs/quick-start/
RocketMQ 和 Spring Cloud Stream
评论