rocketmq 整合 SpringCloudStream
rocketmq 整合 SpringCloudStream
发送消息
@EnableBinding({ Source.class })表示绑定配置文件中名称为 output 的消息通道 Binding,Source 类中定义的消息通道名称为 output。
消费消息:
name-server 是 RocketMq 的 NameServer 地址,destination 指定 Topic 名称,指定名称为 input 的 Binding 接收 TopicTest 的消息
消息监听:
@EnableBinding({ Sink.class})表示绑定配置文件名称为 input 的消息通道 Binding,Sink 类中定义的消息通道名称为 input,@StreamListener 表示定义一个消息监听器,接收 RocketMQ 中的消息。
Spring Cloud Stream
Spring Cloud Stream 是构建与共享消息传递系统连接的高度可伸缩的事件驱动微服务,目的是简化消息业务在 Spring Cloud 应用程序中的开发。
通过 Spring Cloud Stream 注入的输入通道 inputs 和输出通道 outputs 与消息中间件 Middleware 通信,消息通道通过特定中间件绑定器 Binder 实现连接到外部代理。
Spring Cloud Stream 实现基于发布/订阅机制,核心四个部分组成:Spring Framework 中的 Spring Messaging 和 Spring Integration,Spring Cloud Stream 中的 Binders 和 Bindings。
Spring Messaging:Spring Framework 中的统一消息编程模型
Message:消息对象,包含消息头 Header 和消息体 Payload
MessageChannel:消息通道接口,用于接收消息,提供 send 方法将消息发送至消息通道。
MessageHandler:消息处理器接口,用于处理消息逻辑。
Spring Integration:支持企业集成的扩展机制,提供简单的模型来构建企业集成解决方案,对 Spring Messaging 进行扩展。
MessageDispatcher:消息分发接口,用于分发消息和添加删除消息处理器
MessageRouter:消息路由接口,定义默认的输出消息通道。
Filter:消息过滤注解,用于配置消息过滤表达式
Aggregator:消息的聚合注解,用于将一条消息拆分成多条。
Splitter:消息分割,用于将一条消息拆分成多条。
Binders:目标绑定器,负责与外部消息中间件系统集成的组件。
doBindProducer:绑定消息中间件客户端发送消息模块。
doBindConsumer:绑定消息中间件客户端接收消息模块。
Bindings:外部消息中间件系统与应用程序提供的消息生产者和消费者之间的桥梁。
Spring Cloud Alibaba RocketMQ 架构图
MessageChannel(output):消息通道,用于发送消息,Spring Cloud Stream 的标准接口
MessageChannel(input):消息通道,用于订阅消息,Spring Cloud Stream 的标准接口
Binder bindProducer:目标绑定器,将发送通道发过来的消息发送到 RocketMQ 消息服务器
Binder bindConsumer:目标绑定器,将接收到 RocketMQ 消息服务器的消息推送给订阅通道
spring-cloud-stream 官网:
版权声明: 本文为 InfoQ 作者【周杰伦本人】的原创文章。
原文链接:【http://xie.infoq.cn/article/f47bb2c774d3d74a916f12cfd】。文章转载请联系作者。
评论