写点什么

rocketmq 整合 SpringCloudStream

作者:周杰伦本人
  • 2022 年 8 月 07 日
  • 本文字数:1907 字

    阅读完需:约 6 分钟

rocketmq 整合 SpringCloudStream

发送消息

<dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-web</artifactId></dependency>
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-stream-rocketmq</artifactId></dependency>
复制代码


spring:  cloud:    stream:      rocketmq:        binder:          name-server: 127.0.0.1:9876      bindings:        output:          destination: TopicTest          group: PRODUCER_GROUP_TOPIC_TEST
复制代码


@SpringBootApplication@EnableBinding({ Source.class })public class Application {
public static void main(String[] args) { SpringApplication.run(Application.class, args); }}
复制代码


@Componentpublic class ProduceController {
@Autowired private Source source;
@PostConstruct private void init() throws InterruptedException { MessageBuilder builder = MessageBuilder.withPayload("init..."); Message message = builder.build(); source.output().send(message); System.out.println("init..."); }}
复制代码


@EnableBinding({ Source.class })表示绑定配置文件中名称为 output 的消息通道 Binding,Source 类中定义的消息通道名称为 output。

消费消息:

spring:  cloud:    stream:      rocketmq:        binder:          name-server: 127.0.0.1:9876      bindings:        input:          destination: TopicTest2          group: CONSUER_GROUP_DEMO_1
复制代码


name-server 是 RocketMq 的 NameServer 地址,destination 指定 Topic 名称,指定名称为 input 的 Binding 接收 TopicTest 的消息


消息监听:


@EnableBinding({ Sink.class})@SpringBootApplicationpublic class Application {
@StreamListener(value = InputChannel.ORDER_INPUT) public void receive(String receiveMsg) { System.out.println("receive: " + receiveMsg); }
public static void main(String[] args) { SpringApplication.run(Application.class, args); }}
复制代码


@EnableBinding({ Sink.class})表示绑定配置文件名称为 input 的消息通道 Binding,Sink 类中定义的消息通道名称为 input,@StreamListener 表示定义一个消息监听器,接收 RocketMQ 中的消息。

Spring Cloud Stream

Spring Cloud Stream 是构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务,目的是简化消息业务在 Spring Cloud 应用程序中的开发。


通过 Spring Cloud Stream 注入的输入通道 inputs 和输出通道 outputs 与消息中间件 Middleware 通信,消息通道通过特定中间件绑定器 Binder 实现连接到外部代理。


Spring Cloud Stream 实现基于发布/订阅机制,核心四个部分组成:Spring Framework 中的 Spring Messaging 和 Spring Integration,Spring Cloud Stream 中的 Binders 和 Bindings。


Spring Messaging:Spring Framework 中的统一消息编程模型


  • Message:消息对象,包含消息头 Header 和消息体 Payload

  • MessageChannel:消息通道接口,用于接收消息,提供 send 方法将消息发送至消息通道。

  • MessageHandler:消息处理器接口,用于处理消息逻辑。


Spring Integration:支持企业集成的扩展机制,提供简单的模型来构建企业集成解决方案,对 Spring Messaging 进行扩展。


  • MessageDispatcher:消息分发接口,用于分发消息和添加删除消息处理器

  • MessageRouter:消息路由接口,定义默认的输出消息通道。

  • Filter:消息过滤注解,用于配置消息过滤表达式

  • Aggregator:消息的聚合注解,用于将一条消息拆分成多条。

  • Splitter:消息分割,用于将一条消息拆分成多条。


Binders:目标绑定器,负责与外部消息中间件系统集成的组件。


  • doBindProducer:绑定消息中间件客户端发送消息模块。

  • doBindConsumer:绑定消息中间件客户端接收消息模块。


Bindings:外部消息中间件系统与应用程序提供的消息生产者和消费者之间的桥梁。




Spring Cloud Alibaba RocketMQ 架构图


  • MessageChannel(output):消息通道,用于发送消息,Spring Cloud Stream 的标准接口

  • MessageChannel(input):消息通道,用于订阅消息,Spring Cloud Stream 的标准接口

  • Binder bindProducer:目标绑定器,将发送通道发过来的消息发送到 RocketMQ 消息服务器

  • Binder bindConsumer:目标绑定器,将接收到 RocketMQ 消息服务器的消息推送给订阅通道


spring-cloud-stream 官网:


https://docs.spring.io/spring-cloud-stream/docs/3.1.4/reference/html/spring-cloud-stream.html#spring-cloud-stream-reference

发布于: 刚刚阅读数: 5
用户头像

还未添加个人签名 2020.02.29 加入

公众号《盼盼小课堂》,多平台优质博主

评论

发布
暂无评论
rocketmq整合SpringCloudStream_8月月更_周杰伦本人_InfoQ写作社区