SpringCloud-Stream 实战快速入门
POM
主要增加了 org.springframework.cloud:spring-cloud-starter-stream-rocketmq 依赖,
老顾采用最新 Spring Boot 的 2.1.8.RELEASE 版本,SpringCloud 的 Greenwich.SR2 版本;Spring-Cloud-Alibaba 的 2.1.0.RELEASE 版本。
增加依赖
EnableBinding 配置
我们需要通过在配置类上使用 @EnableBinding 指定需要使用的 Binding,它指定的是一个接口,在对应接口中会定义一些标注了 @Input 或 @Output 的方法,它们就对应一个 Binding 了。
@Output 注解对应的是 org.springframework.messaging.MessageChannel,代表发布者 @Input 注解对应的是 org.springframework.messaging.SubscribableChannel,代表消费者
Source 内置接口
org.springframework.cloud.stream.messaging.Source 是内置的 Output 接口
小伙伴们也可以不用内置的,模仿 Source 自行定义就行了
它定义了一个 OUTPUT 类型的 Binding,名称为 output,当不通过 @Output 指定 Binding 的名称时,默认会使用方法名作为 Binding 的名称。
Sink 内置接口
Sink 的定义如下,它定义了一个 INPUT 类型的 Binding,名称为 input,当不通过 @Input 指定 Binding 的名称时,默认会使用方法名作为 Binding 的名称。
在一个接口中可以同时定义多个 Binding,只需要定义多个 @Input 或 @Output 标注的方法。Processor 接口同时继承了 Source 和 Sink 接口,所以当 @EnableBinding 指定了 Processor 接口时相当于同时应用了两个 Binding。
public interface Processor extends Source, Sink {}@EnableBinding({ Processor.class })
生产者
我们来定义一个 output 类型的 Binding![image](https://upload-images.jianshu.io/upload_images/15590149-d2aba9ebdc7090eb?imageMogr2/auto-o
rient/strip%7CimageView2/2/w/1240)
在上面代码中我们指定了 @EnableBinding 接口为 Source 接口,即启用了名称为 output 的 OUTPUT 类型的 Binding。Spring Cloud 会自动实现该 Binding 的实现,也会提供 Binding 接口的实现,并注册到 bean 容器中。即可以在程序中自动注入 Source 类型的 bean,也可以注入 MessageChannel 类型的 bean。
上面定义了一个生产发布服务,直接注入 Source 类型的 bean,然后通过 Source 的 output()获取 MessageChannel 实例,通过它的 send()方法进行消息发送。
另一种使用方式,就是直接获取 MessageChannel,如下代码,效果是一样的。
那发送的消息究竟会发送到哪里呢?这就需要我们来定义对应的 Binding 和实际消息容器的生产者的映射了。可以通过 spring.cloud.stream.bindings.<bindingName>.*的形式定义 Binding 的一些属性。
具体有什么属性可查看 org.springframework.cloud.stream.config.BindingProperties
这里我们通过其 destination 属性指定该 Binding 对应的实际的目的地,对应于 RocketMQ 就是一个 Topic。
spring.cloud.stream.bindings.output.destination=test-topic
即我们上面发送的消息将发到 RocketMQ 的名为 test-topic 的 Topic。RocketMQ 是需要指定 NameServer 的,所以在发送消息前,还需要基于 RocketMQ 这个 Binder 配置其 NameServer 的地址。
spring.cloud.stream.rocketmq.binder.namesrv-addr=192.168.31.153:9876
在启动了 RocketMQ 的 NameServer 和 Broker 之后,就可以利用上面的代码进行消息发送了。测试代码如下。
在测试的时候可以在启动 RocketMQ 时指定 autoCreateTopicEnable=true 以开启自动创建 Topic 的功能,如 mqbroker -n localhost:9876 autoCreateTopicEnable=true。
继承 CommandLineRunner 接口,启动就会执行 run 方法,就是调用 ProviderService 发送消息。到 Rocketmq 控制台查看消息
消费者
消费者接收消息和生产者类似,也需要定义相应的 Binding,也需要通过 @EnableBinding 进行指定。Spring Cloud 的 Sink 接口中已经定义好一个名为 input 的 Binding,如果只需要一个接收 Binding,可以直接拿来用。
作为消费者的 Binding 也必须指定对应的目的地,还必须指定一个消费者分组 group,相同 group 的消费者可以共同消费相同 destination 的消息,分担压力。
比如一个作为消费者的应用部署了三份,它们的 group 都是一样的,如果来了三条消息,那么可能三台应用都分别消费了其中的一条消息。而如果部署三份的 group 都不一样,则每台应用都将消费全部的三条消息。
spring.cloud.stream.bindings.input.destination=test-topicspring.cloud.stream.bindings.input.group=test-group
一般生产中 group 的名字用项目工程的名字
spring.cloud.stream.bindings.input.group=${spring.application.name}
评论