写点什么

SpringCloud-Stream 实战快速入门

作者:Java高工P7
  • 2021 年 11 月 11 日
  • 本文字数:1826 字

    阅读完需:约 6 分钟

POM

主要增加了 org.springframework.cloud:spring-cloud-starter-stream-rocketmq 依赖,


老顾采用最新 Spring Boot 的 2.1.8.RELEASE 版本SpringCloud 的 Greenwich.SR2 版本;Spring-Cloud-Alibaba 的 2.1.0.RELEASE 版本。


增加依赖



EnableBinding 配置


我们需要通过在配置类上使用 @EnableBinding 指定需要使用的 Binding,它指定的是一个接口,在对应接口中会定义一些标注了 @Input 或 @Output 的方法,它们就对应一个 Binding 了。


@Output 注解对应的是 org.springframework.messaging.MessageChannel,代表发布者 @Input 注解对应的是 org.springframework.messaging.SubscribableChannel,代表消费者

Source 内置接口

org.springframework.cloud.stream.messaging.Source 是内置的 Output 接口


小伙伴们也可以不用内置的,模仿 Source 自行定义就行了



它定义了一个 OUTPUT 类型的 Binding,名称为 output,当不通过 @Output 指定 Binding 的名称时,默认会使用方法名作为 Binding 的名称。

Sink 内置接口

Sink 的定义如下,它定义了一个 INPUT 类型的 Binding,名称为 input,当不通过 @Input 指定 Binding 的名称时,默认会使用方法名作为 Binding 的名称。



在一个接口中可以同时定义多个 Binding,只需要定义多个 @Input 或 @Output 标注的方法。Processor 接口同时继承了 Source 和 Sink 接口,所以当 @EnableBinding 指定了 Processor 接口时相当于同时应用了两个 Binding。


public interface Processor extends Source, Sink {}@EnableBinding({ Processor.class })

生产者

我们来定义一个 output 类型的 Binding![image](https://upload-images.jianshu.io/upload_images/15590149-d2aba9ebdc7090eb?imageMogr2/auto-o


【一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


rient/strip%7CimageView2/2/w/1240)


在上面代码中我们指定了 @EnableBinding 接口为 Source 接口,即启用了名称为 output 的 OUTPUT 类型的 Binding。Spring Cloud 会自动实现该 Binding 的实现,也会提供 Binding 接口的实现,并注册到 bean 容器中。即可以在程序中自动注入 Source 类型的 bean,也可以注入 MessageChannel 类型的 bean



上面定义了一个生产发布服务,直接注入 Source 类型的 bean,然后通过 Source 的 output()获取 MessageChannel 实例,通过它的 send()方法进行消息发送。


另一种使用方式,就是直接获取 MessageChannel,如下代码,效果是一样的。



那发送的消息究竟会发送到哪里呢?这就需要我们来定义对应的 Binding 和实际消息容器的生产者的映射了。可以通过 spring.cloud.stream.bindings.<bindingName>.*的形式定义 Binding 的一些属性。


具体有什么属性可查看 org.springframework.cloud.stream.config.BindingProperties


这里我们通过其 destination 属性指定该 Binding 对应的实际的目的地对应于 RocketMQ 就是一个 Topic。


spring.cloud.stream.bindings.output.destination=test-topic


即我们上面发送的消息将发到 RocketMQ 的名为 test-topic 的 Topic。RocketMQ 是需要指定 NameServer 的,所以在发送消息前,还需要基于 RocketMQ 这个 Binder 配置其 NameServer 的地址。


spring.cloud.stream.rocketmq.binder.namesrv-addr=192.168.31.153:9876


在启动了 RocketMQ 的 NameServer 和 Broker 之后,就可以利用上面的代码进行消息发送了。测试代码如下。


在测试的时候可以在启动 RocketMQ 时指定 autoCreateTopicEnable=true 以开启自动创建 Topic 的功能,如 mqbroker -n localhost:9876 autoCreateTopicEnable=true。



继承 CommandLineRunner 接口,启动就会执行 run 方法,就是调用 ProviderService 发送消息。到 Rocketmq 控制台查看消息



消费者

消费者接收消息和生产者类似,也需要定义相应的 Binding,也需要通过 @EnableBinding 进行指定。Spring Cloud 的 Sink 接口中已经定义好一个名为 input 的 Binding,如果只需要一个接收 Binding,可以直接拿来用。



作为消费者的 Binding 也必须指定对应的目的地,还必须指定一个消费者分组 group相同 group 的消费者可以共同消费相同 destination 的消息,分担压力。


比如一个作为消费者的应用部署了三份它们的 group 都是一样的,如果来了三条消息,那么可能三台应用都分别消费了其中的一条消息。而如果部署三份的 group 都不一样,则每台应用都将消费全部的三条消息


spring.cloud.stream.bindings.input.destination=test-topicspring.cloud.stream.bindings.input.group=test-group


一般生产中 group 的名字用项目工程的名字


spring.cloud.stream.bindings.input.group=${spring.application.name}

用户头像

Java高工P7

关注

还未添加个人签名 2021.11.08 加入

还未添加个人简介

评论

发布
暂无评论
SpringCloud-Stream实战快速入门