Spring Messaging 是 Spring Framework 的一个关键模块,它通过提供一系列注解如 @EnableMessaging、@MessageEndpoint、@JmsListener、@MessageMapping、@SendTo、@SubscribeEvent 和 @MessageExceptionHandler,简化了消息驱动的应用程序开发。这些注解支持异步消息处理、事件发布-订阅模式,并与 Spring 的其他模块如 Spring Boot 和 Spring Integration 高度集成,使得构建复杂的消息系统变得简单而直观。
肖哥弹架构 跟大家“弹弹” 框架注解使用,需要代码关注
欢迎 点赞,关注,评论。
关注公号 Solomon 肖哥弹架构获取更多精彩内容
历史热点文章
Spring Messaging 架构图
1. 启用消息传递基础设施
@EnableMessaging
@EnableMessaging 注解用于配置类上,启用 Spring Messaging 的自动配置,是开始使用 Spring Messaging 的第一步。
无特定属性。
@Configuration@EnableMessagingpublic class MessagingConfiguration { @Bean public MessageConverter messageConverter() { return new MappingJackson2MessageConverter(); }}
复制代码
2. 配置消息中间件支持
@EnableJms
@EnableJms 注解用于启用 Java Message Service (JMS) 的支持。
无特定属性。
@Configuration@EnableJmspublic class JmsConfiguration implements JmsMessagingConfigurer { @Override public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) { registrar.setContainerFactory("myFactory"); }
@Bean public DefaultJmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrency("5"); return factory; }}
复制代码
@EnableKafka
@EnableKafka 注解用于启用 Apache Kafka 的支持。
无特定属性。
@Configuration@EnableKafkapublic class KafkaConfiguration { @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; }
@Bean public ConsumerFactory<String, String> consumerFactory() { // Configure consumer properties return new DefaultKafkaConsumerFactory<>(consumerConfigs()); }
@Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup"); // Additional configurations return props; }}
复制代码
@EnableRabbit
@EnableRabbit 注解用于启用 RabbitMQ 的支持。
无特定属性。
@Configuration@EnableRabbitpublic class RabbitConfiguration implements RabbitMessagingConfigurer { @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) { registrar.setContainerFactory("myFactory"); }
@Bean public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrentConsumers(5); return factory; }}
复制代码
3. 定义消息监听器
@JmsListener
@JmsListener 注解用于方法上,定义一个方法作为 JMS 消息的监听器,指定消息的目的地。
@Componentpublic class OrderMessageListener { @JmsListener(destination = "orders", containerFactory = "myFactory") public void receiveOrderMessage(String message) { // 将接收到的订单信息进行处理 processOrder(message); }
private void processOrder(String orderDetails) { // 订单处理逻辑 }}
复制代码
4. 消息端点配置
@MessageEndpoint
@MessageEndpoint 注解用于类上,标识一个类为消息端点,可以包含多个消息处理方法。
无特定属性。
@Component@MessageEndpointpublic class MessageEndpointExample { @JmsListener(destination = "greetings") public void handleGreeting(String greeting) { System.out.println("Received greeting: " + greeting); }
// 可以有多个 @JmsListener 方法}
复制代码
5. 映射消息到处理程序
@MessageMapping
@MessageMapping 注解用于方法上,类似于@RequestMapping,但它用于映射 STOMP 消息到 WebSocket 消息处理程序。
注解属性介绍
value: 指定消息的目的地。
注解业务案例
@Controllerpublic class WebSocketGreetingController { @MessageMapping("/greeting") public Greeting greeting(GreetingMessage message) { return new GreetingMessage("Hello, " + message.getName()); }}
public class GreetingMessage { private String name;
// 构造函数、getter 和 setter 省略}
复制代码
6. 消息处理方法的参数绑定
@Payload
@Payload 注解用于方法参数上,将传入消息的有效负载绑定到方法参数。
无特定属性。
public class MessagePayloadHandler { public void handlePayload(@Payload MyPayload payload) { // 使用payload中的数据 processData(payload.getData()); }
private void processData(MyData data) { // 数据处理逻辑 }}
复制代码
@Header
@Header 注解用于方法参数上,将消息头绑定到方法参数。
注解属性介绍
value: 指定要绑定的头的名称。
注解业务案例
public class MessageHeaderHandler { public void handleMessage(@Header("messageType") String messageType) { // 根据消息类型进行处理 handleBasedOnType(messageType); }
private void handleBasedOnType(String type) { // 类型特定处理逻辑 }}
复制代码
7. 消息处理异常处理
@MessageExceptionHandler
@MessageExceptionHandler 注解用于方法上,处理消息处理过程中发生的异常。
无特定属性。
public class MessageExceptionHandler { @MessageExceptionHandler public void handleException(Exception ex) { // 记录日志或执行其他异常处理操作 logError(ex); }
private void logError(Exception ex) { // 错误记录逻辑 }}
复制代码
8. 事件订阅
@SubscribeEvent
@SubscribeEvent 注解用于方法上,标记方法订阅特定的事件或消息。
无特定属性。
public class EventSubscriber { @SubscribeEvent public void handleCustomEvent(MyCustomEvent event) { // 处理自定义事件 processEvent(event); }
private void processEvent(MyCustomEvent event) { // 事件处理逻辑 }}
复制代码
9. 消息发送目的地配置
@SendTo
@SendTo 注解用于方法上,指定方法的返回值应该发送到的目的地,可以是消息队列或主题。
注解属性介绍
value: 指定消息发送的目的地。
注解业务案例
public class MessageSender { @SendTo("/topic/responses") public String processAndSend(String input) { // 处理消息并返回响应 String response = "Response to " + input; // 发送响应到指定目的地 return response; }}
复制代码
10、综合性注解应用案例
1. 项目结构
OrderApplication.java - Spring Boot 启动类
MessagingConfig.java - Spring Messaging 配置类
OrderService.java - 订单业务逻辑服务
OrderRepository.java - 订单数据访问层
Order.java - 订单实体
OrderEvent.java - 订单事件
2. Spring Boot 启动类
@SpringBootApplication@EnableJms@EnableKafka@EnableRabbitpublic class OrderApplication { public static void main(String[] args) { SpringApplication.run(OrderApplication.class, args); }}
复制代码
3. Spring Messaging 配置类
@Configurationpublic class MessagingConfig implements WebSocketMessageBrokerConfigurer {
@Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS(); }
@Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/topic", "/queue"); registry.setApplicationDestinationPrefixes("/app"); }
@Bean public JmsListenerContainerFactory<?> jmsListenerContainerFactory( ConnectionFactory connectionFactory) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setConcurrency("5"); return factory; }
// Kafka和RabbitMQ的配置省略}
复制代码
4. 订单业务逻辑服务
@Servicepublic class OrderService {
@Autowired private OrderRepository orderRepository;
public void processOrderCreation(String orderDetails) { Order order = new Order(orderDetails); orderRepository.save(order);
// 发送订单创建事件 rabbitTemplate.convertAndSend("orderCreatedExchange", "order.created", order); }
@JmsListener(destination = "orderQueue", containerFactory = "jmsListenerContainerFactory") public void handleOrderMessage(Order order) { // 处理接收到的订单消息 }
@MessageMapping("/order/update") @SendTo("/topic/orders") public Order updateOrder(@Payload Order order) { // 更新订单信息 orderRepository.save(order); return order; }
@MessageExceptionHandler public void handleMessagingException(Exception ex) { // 记录异常信息 }
@SubscribeEvent public void handleOrderEvent(OrderEvent event) { // 处理订单事件 }}
复制代码
5. 订单数据访问层
@Repositorypublic interface OrderRepository extends JpaRepository<Order, Long> { @Modifying @Transactional void deleteByOrderDetails(String details);}
复制代码
6. 订单实体
@Entity@Table(name = "orders")public class Order { @Id @GeneratedValue(strategy = GenerationType.IDENTITY) private Long id;
private String orderDetails;
// 构造函数、getter 和 setter 省略}
复制代码
7. 订单事件
public class OrderEvent { private Order order;
// 构造函数、getter 和 setter 省略}
复制代码
启动类 OrderApplication 通过 @SpringBootApplication 注解激活 Spring Boot 应用,并使用 @Enable* 注解启用了 JMS、Kafka 和 RabbitMQ 的支持。
MessagingConfig 类配置了 WebSocket 端点、消息代理和消息监听器容器工厂。
OrderService 类包含了业务逻辑,使用 @JmsListener 注解监听 JMS 队列,@MessageMapping 注解处理 STOMP 消息,并使用 @SendTo 注解发送消息到主题。同时,它还处理了消息异常和订阅了订单事件。
OrderRepository 接口继承了 JpaRepository 并添加了自定义的修改操作。
Order 类是一个简单的实体,使用 @Entity 和 @Table 注解映射到数据库。
OrderEvent 类用于封装订单事件数据。
评论