Spring Cloud Stream 是什么?
它是什么
Spring Cloud Stream 是一个构建高度可扩展的事件驱动微服务的框架,与共享消息系统相连。
该框架提供了一个灵活的编程模型,它建立在已经建立和熟悉的 Spring
用法和最佳实践之上,包括支持持久化的 pub/sub
语义、消费者组和有状态分区。
绑定的一些实现
Spring Cloud Stream 支持多种绑定实现,下表包括了 GitHub 项目的链接。
Spring Cloud Stream 的核心构件是:
Destination Binders: 负责提供与外部消息系统集成的组件。
Destination Bindings: 作为消息中间件与应用程序的提供者和消费者之间的桥梁。
Message: 生产者和消费者用于与目的地装订器沟通的典型数据结构(从而通过外部消息系统与其他应用程序进行通信的典型数据结构)。
为什么用 Cloud Stream?
解耦。使用了 SCS 之后,我们只需要在配置文件中配置下对应的中间件服务器地址等信息,然后就可使用,使得业务中不需要出现具体的消息中间件。
便于迁移。例如项目中一开始使用的是 rabbitmq,后期要想迁移成 kafka 的话,如果使用传统方式,在使用的地方使用具体消息中间件的话,那么迁移的成本会很高,而使用 SCS 的话,只需要更改配置文件即可。
如何使用?(集成 rocket mq )
配置 JAVA_HOME 路径(以下为 mac 环境下的配置)
chmod 777 /etc/profile
sudo vim /etc/profile
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_162.jdk/Contents/Home
source /etc/profile
安装启动 Rocket MQ
从官网 下载二进制文件
启动 nameserver nohup sh bin/mqnamesrv &
启动 broker nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true &
设置 nameserver 地址 export NAMESRV_ADDR=localhost:9876
生产者发送消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
消费者消费消息 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
关闭 broker sh bin/mqshutdown broker
关闭 nameserver sh bin/mqshutdown namesrv
提示:
如果没有执行 export NAMESRV_ADDR=localhost:9876
会导致 java.lang.IllegalStateException: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to null failed
集成 SCS
通过 https://start.spring.io/ 创建一个初始化项目
这里贴出 pom
文件配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.6.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>mq</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Hoxton.SR4</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
复制代码
创建 CustomerChannel
public interface CustomerChannel {
/**
* 这里的名称对应了spring.cloud.stream.rocketmq.bindings.<channelName>
*/
String OUTPUT = "my-output";
String INPUT = "my-input";
@Output(CustomerChannel.OUTPUT)
MessageChannel output();
@Input(CustomerChannel.INPUT)
SubscribableChannel input();
}
复制代码
定义 TestController
@RestController
@EnableBinding({CustomerChannel.class})
public class TestController {
private final CustomerChannel customerChannel;
public TestController(CustomerChannel customerChannel) {
this.customerChannel = customerChannel;
}
/**
* 使用一个controller断点模拟发送消息,可以在setHeader方法中设置header来实现消息过滤
*/
@PostMapping("/message-send")
public String testCustomInterfaceSendMsg() {
Message<String> message = MessageBuilder.withPayload("send message")
.setHeader(RocketMQHeaders.TAGS, "tag2")
.setHeader("mytag", "my-tag")
.build();
this.customerChannel.output().send(message);
Message<String> message2 = MessageBuilder.withPayload("send message")
.setHeader(RocketMQHeaders.TAGS, "tag3")
.setHeader("mytag", "your-tag")
.build();
this.customerChannel.output().send(message2);
return "success";
}
/**
* 使用@StreamListener来监听消息
*/
@StreamListener(value = CustomerChannel.INPUT, condition = "headers['mytag']=='my-tag'")
public void testCustomListener(Message message) {
System.out.println(message.getHeaders().get("TAGS") + " " + message.getPayload().toString());
}
/**
* 使用@StreamListener来监听消息
*/
@StreamListener(value = CustomerChannel.INPUT, condition = "headers['mytag']=='your-tag'")
public void testCustomListenerFilter(Message message) {
System.out.println(message.getHeaders().get("TAGS") + " " + message.getPayload().toString());
}
}
复制代码
配置 application.yml
spring:
cloud:
stream:
rocketmq:
binder:
name-server: localhost:9876
enable-msg-trace: true
bindings:
my-input:
consumer:
tags: tag2 || tag1 || tag3 || tag4 # tag 为 tag1/tag2/tag3/tag4
bindings:
my-input:
destination: my-stream-topic # 相当于 rocketmq 的 topic
group: my-stream-group
binder: rocketmq #
consumer:
instanceCount: 1 # 指定实例数量
my-output:
destination: my-stream-topic # 相当于 rocketmq 的 topic
复制代码
运行 MQApplication
,使用 POST 方法请求 localhost:8989/message-send
核心原理
消息发送和消费的流程:
消息通过 MessageChannel(output)
进行发送,AbstractMessageChannel
实现了 MessageChannel
, AbstractSubscribableChannel
继承了 AbstractMessageChannel
并且实现了 SubscribableChannel
,重写了其中的 subscribe
方法,subscribe()
指定了 MessageHandler
,最终会调用 RocketMQMessageHandler
发送消息
消息发出之后,对应的消息中间件内部会有通道适配器,将中间件特有的消息格式转换为 SpringMessage,然后发送到 MessageChannel(input)
StreamListener
订阅了对应的 input ,根据一定的条件,就能收到消费者发出的消息。
本文参考
https://github.com/alibaba/spring-cloud-alibaba/wiki/RocketMQ-en
http://rocketmq.apache.org/docs/quick-start/
RocketMQ 和 Spring Cloud Stream
评论