写点什么

6 个 Spring messaging 注解:整体架构分析与注解应用案例 (必须收藏)

作者:肖哥弹架构
  • 2024-09-07
    河北
  • 本文字数:5319 字

    阅读完需:约 17 分钟

6个 Spring messaging注解:整体架构分析与注解应用案例(必须收藏)

Spring Messaging 是 Spring Framework 的一个关键模块,它通过提供一系列注解如 @EnableMessaging@MessageEndpoint@JmsListener@MessageMapping@SendTo@SubscribeEvent@MessageExceptionHandler,简化了消息驱动的应用程序开发。这些注解支持异步消息处理、事件发布-订阅模式,并与 Spring 的其他模块如 Spring Boot 和 Spring Integration 高度集成,使得构建复杂的消息系统变得简单而直观。


肖哥弹架构 跟大家“弹弹” 框架注解使用,需要代码关注

欢迎 点赞,关注,评论。

关注公号 Solomon 肖哥弹架构获取更多精彩内容

历史热点文章

Spring Messaging 架构图


1. 启用消息传递基础设施

@EnableMessaging

  • 注解作用介绍


@EnableMessaging 注解用于配置类上,启用 Spring Messaging 的自动配置,是开始使用 Spring Messaging 的第一步。


  • 注解属性介绍


无特定属性。


  • 注解业务案例


@Configuration@EnableMessagingpublic class MessagingConfiguration {    @Bean    public MessageConverter messageConverter() {        return new MappingJackson2MessageConverter();    }}
复制代码

2. 配置消息中间件支持

@EnableJms

  • 注解作用介绍


@EnableJms 注解用于启用 Java Message Service (JMS) 的支持。


  • 注解属性介绍


无特定属性。


  • 注解业务案例


@Configuration@EnableJmspublic class JmsConfiguration implements JmsMessagingConfigurer {    @Override    public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {        registrar.setContainerFactory("myFactory");    }
@Bean public DefaultJmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrency("5"); return factory; }}
复制代码

@EnableKafka

  • 注解作用介绍


@EnableKafka 注解用于启用 Apache Kafka 的支持。


  • 注解属性介绍


无特定属性。


  • 注解业务案例


@Configuration@EnableKafkapublic class KafkaConfiguration {    @Bean    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {        ConcurrentKafkaListenerContainerFactory<String, String> factory =                new ConcurrentKafkaListenerContainerFactory<>();        factory.setConsumerFactory(consumerFactory());        return factory;    }
@Bean public ConsumerFactory<String, String> consumerFactory() { // Configure consumer properties return new DefaultKafkaConsumerFactory<>(consumerConfigs()); }
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup"); // Additional configurations return props; }}
复制代码

@EnableRabbit

  • 注解作用介绍


@EnableRabbit 注解用于启用 RabbitMQ 的支持。


  • 注解属性介绍


无特定属性。


  • 注解业务案例


@Configuration@EnableRabbitpublic class RabbitConfiguration implements RabbitMessagingConfigurer {    @Override    public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {        registrar.setContainerFactory("myFactory");    }
@Bean public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(5); return factory; }}
复制代码

3. 定义消息监听器

@JmsListener

  • 注解作用介绍


@JmsListener 注解用于方法上,定义一个方法作为 JMS 消息的监听器,指定消息的目的地。


  • 注解属性介绍

  • destination: 指定消息目的地。

  • containerFactory: 指定消息监听器容器工厂。

  • 注解业务案例


@Componentpublic class OrderMessageListener {    @JmsListener(destination = "orders", containerFactory = "myFactory")    public void receiveOrderMessage(String message) {        // 将接收到的订单信息进行处理        processOrder(message);    }
private void processOrder(String orderDetails) { // 订单处理逻辑 }}
复制代码

4. 消息端点配置

@MessageEndpoint

  • 注解作用介绍


@MessageEndpoint 注解用于类上,标识一个类为消息端点,可以包含多个消息处理方法。


  • 注解属性介绍


无特定属性。


  • 注解业务案例


@Component@MessageEndpointpublic class MessageEndpointExample {    @JmsListener(destination = "greetings")    public void handleGreeting(String greeting) {        System.out.println("Received greeting: " + greeting);    }
// 可以有多个 @JmsListener 方法}
复制代码

5. 映射消息到处理程序

@MessageMapping

  • 注解作用介绍


@MessageMapping 注解用于方法上,类似于@RequestMapping,但它用于映射 STOMP 消息到 WebSocket 消息处理程序。


  • 注解属性介绍

  • value: 指定消息的目的地。

  • 注解业务案例


@Controllerpublic class WebSocketGreetingController {    @MessageMapping("/greeting")    public Greeting greeting(GreetingMessage message) {        return new GreetingMessage("Hello, " + message.getName());    }}
public class GreetingMessage { private String name;
// 构造函数、getter 和 setter 省略}
复制代码

6. 消息处理方法的参数绑定

@Payload

  • 注解作用介绍


@Payload 注解用于方法参数上,将传入消息的有效负载绑定到方法参数。


  • 注解属性介绍


无特定属性。


  • 注解业务案例


public class MessagePayloadHandler {    public void handlePayload(@Payload MyPayload payload) {        // 使用payload中的数据        processData(payload.getData());    }
private void processData(MyData data) { // 数据处理逻辑 }}
复制代码

@Header

  • 注解作用介绍


@Header 注解用于方法参数上,将消息头绑定到方法参数。


  • 注解属性介绍

  • value: 指定要绑定的头的名称。

  • 注解业务案例


public class MessageHeaderHandler {    public void handleMessage(@Header("messageType") String messageType) {        // 根据消息类型进行处理        handleBasedOnType(messageType);    }
private void handleBasedOnType(String type) { // 类型特定处理逻辑 }}
复制代码

7. 消息处理异常处理

@MessageExceptionHandler

  • 注解作用介绍


@MessageExceptionHandler 注解用于方法上,处理消息处理过程中发生的异常。


  • 注解属性介绍


无特定属性。


  • 注解业务案例


public class MessageExceptionHandler {    @MessageExceptionHandler    public void handleException(Exception ex) {        // 记录日志或执行其他异常处理操作        logError(ex);    }
private void logError(Exception ex) { // 错误记录逻辑 }}
复制代码

8. 事件订阅

@SubscribeEvent

  • 注解作用介绍


@SubscribeEvent 注解用于方法上,标记方法订阅特定的事件或消息。


  • 注解属性介绍


无特定属性。


  • 注解业务案例


public class EventSubscriber {    @SubscribeEvent    public void handleCustomEvent(MyCustomEvent event) {        // 处理自定义事件        processEvent(event);    }
private void processEvent(MyCustomEvent event) { // 事件处理逻辑 }}
复制代码

9. 消息发送目的地配置

@SendTo

  • 注解作用介绍


@SendTo 注解用于方法上,指定方法的返回值应该发送到的目的地,可以是消息队列或主题。


  • 注解属性介绍

  • value: 指定消息发送的目的地。

  • 注解业务案例


public class MessageSender {    @SendTo("/topic/responses")    public String processAndSend(String input) {        // 处理消息并返回响应        String response = "Response to " + input;        // 发送响应到指定目的地        return response;    }}
复制代码

10、综合性注解应用案例

1. 项目结构

  • OrderApplication.java - Spring Boot 启动类

  • MessagingConfig.java - Spring Messaging 配置类

  • OrderService.java - 订单业务逻辑服务

  • OrderRepository.java - 订单数据访问层

  • Order.java - 订单实体

  • OrderEvent.java - 订单事件

2. Spring Boot 启动类

@SpringBootApplication@EnableJms@EnableKafka@EnableRabbitpublic class OrderApplication {    public static void main(String[] args) {        SpringApplication.run(OrderApplication.class, args);    }}
复制代码

3. Spring Messaging 配置类

@Configurationpublic class MessagingConfig implements WebSocketMessageBrokerConfigurer {
@Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS(); }
@Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic", "/queue"); registry.setApplicationDestinationPrefixes("/app"); }
@Bean public JmsListenerContainerFactory<?> jmsListenerContainerFactory( ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrency("5"); return factory; }
// Kafka和RabbitMQ的配置省略}
复制代码

4. 订单业务逻辑服务

@Servicepublic class OrderService {
@Autowired private OrderRepository orderRepository;
public void processOrderCreation(String orderDetails) { Order order = new Order(orderDetails); orderRepository.save(order);
// 发送订单创建事件 rabbitTemplate.convertAndSend("orderCreatedExchange", "order.created", order); }
@JmsListener(destination = "orderQueue", containerFactory = "jmsListenerContainerFactory") public void handleOrderMessage(Order order) { // 处理接收到的订单消息 }
@MessageMapping("/order/update") @SendTo("/topic/orders") public Order updateOrder(@Payload Order order) { // 更新订单信息 orderRepository.save(order); return order; }
@MessageExceptionHandler public void handleMessagingException(Exception ex) { // 记录异常信息 }
@SubscribeEvent public void handleOrderEvent(OrderEvent event) { // 处理订单事件 }}
复制代码

5. 订单数据访问层

@Repositorypublic interface OrderRepository extends JpaRepository<Order, Long> {    @Modifying    @Transactional    void deleteByOrderDetails(String details);}
复制代码

6. 订单实体

@Entity@Table(name = "orders")public class Order {    @Id    @GeneratedValue(strategy = GenerationType.IDENTITY)    private Long id;
private String orderDetails;
// 构造函数、getter 和 setter 省略}
复制代码

7. 订单事件

public class OrderEvent {    private Order order;
// 构造函数、getter 和 setter 省略}
复制代码


  • 启动类 OrderApplication 通过 @SpringBootApplication 注解激活 Spring Boot 应用,并使用 @Enable* 注解启用了 JMS、Kafka 和 RabbitMQ 的支持。

  • MessagingConfig 类配置了 WebSocket 端点、消息代理和消息监听器容器工厂。

  • OrderService 类包含了业务逻辑,使用 @JmsListener 注解监听 JMS 队列,@MessageMapping 注解处理 STOMP 消息,并使用 @SendTo 注解发送消息到主题。同时,它还处理了消息异常和订阅了订单事件。

  • OrderRepository 接口继承了 JpaRepository 并添加了自定义的修改操作。

  • Order 类是一个简单的实体,使用 @Entity 和 @Table 注解映射到数据库。

  • OrderEvent 类用于封装订单事件数据。

发布于: 刚刚阅读数: 4
用户头像

智慧属心窍之锁 2019-05-27 加入

擅长于通信协议、微服务架构、框架设计、消息队列、服务治理、PAAS、SAAS、ACE\ACP、大模型

评论

发布
暂无评论
6个 Spring messaging注解:整体架构分析与注解应用案例(必须收藏)_Java_肖哥弹架构_InfoQ写作社区