Spring Messaging 是 Spring Framework 的一个关键模块,它通过提供一系列注解如 @EnableMessaging
、@MessageEndpoint
、@JmsListener
、@MessageMapping
、@SendTo
、@SubscribeEvent
和 @MessageExceptionHandler
,简化了消息驱动的应用程序开发。这些注解支持异步消息处理、事件发布-订阅模式,并与 Spring 的其他模块如 Spring Boot 和 Spring Integration 高度集成,使得构建复杂的消息系统变得简单而直观。
肖哥弹架构 跟大家“弹弹” 框架注解使用,需要代码关注
欢迎 点赞,关注,评论。
关注公号 Solomon 肖哥弹架构获取更多精彩内容
历史热点文章
Spring Messaging 架构图
1. 启用消息传递基础设施
@EnableMessaging
@EnableMessaging
注解用于配置类上,启用 Spring Messaging 的自动配置,是开始使用 Spring Messaging 的第一步。
无特定属性。
@Configuration
@EnableMessaging
public class MessagingConfiguration {
@Bean
public MessageConverter messageConverter() {
return new MappingJackson2MessageConverter();
}
}
复制代码
2. 配置消息中间件支持
@EnableJms
@EnableJms
注解用于启用 Java Message Service (JMS) 的支持。
无特定属性。
@Configuration
@EnableJms
public class JmsConfiguration implements JmsMessagingConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
registrar.setContainerFactory("myFactory");
}
@Bean
public DefaultJmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("5");
return factory;
}
}
复制代码
@EnableKafka
@EnableKafka
注解用于启用 Apache Kafka 的支持。
无特定属性。
@Configuration
@EnableKafka
public class KafkaConfiguration {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
// Configure consumer properties
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroup");
// Additional configurations
return props;
}
}
复制代码
@EnableRabbit
@EnableRabbit
注解用于启用 RabbitMQ 的支持。
无特定属性。
@Configuration
@EnableRabbit
public class RabbitConfiguration implements RabbitMessagingConfigurer {
@Override
public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
registrar.setContainerFactory("myFactory");
}
@Bean
public SimpleRabbitListenerContainerFactory myFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(5);
return factory;
}
}
复制代码
3. 定义消息监听器
@JmsListener
@JmsListener
注解用于方法上,定义一个方法作为 JMS 消息的监听器,指定消息的目的地。
@Component
public class OrderMessageListener {
@JmsListener(destination = "orders", containerFactory = "myFactory")
public void receiveOrderMessage(String message) {
// 将接收到的订单信息进行处理
processOrder(message);
}
private void processOrder(String orderDetails) {
// 订单处理逻辑
}
}
复制代码
4. 消息端点配置
@MessageEndpoint
@MessageEndpoint
注解用于类上,标识一个类为消息端点,可以包含多个消息处理方法。
无特定属性。
@Component
@MessageEndpoint
public class MessageEndpointExample {
@JmsListener(destination = "greetings")
public void handleGreeting(String greeting) {
System.out.println("Received greeting: " + greeting);
}
// 可以有多个 @JmsListener 方法
}
复制代码
5. 映射消息到处理程序
@MessageMapping
@MessageMapping
注解用于方法上,类似于@RequestMapping
,但它用于映射 STOMP 消息到 WebSocket 消息处理程序。
注解属性介绍
value
: 指定消息的目的地。
注解业务案例
@Controller
public class WebSocketGreetingController {
@MessageMapping("/greeting")
public Greeting greeting(GreetingMessage message) {
return new GreetingMessage("Hello, " + message.getName());
}
}
public class GreetingMessage {
private String name;
// 构造函数、getter 和 setter 省略
}
复制代码
6. 消息处理方法的参数绑定
@Payload
@Payload
注解用于方法参数上,将传入消息的有效负载绑定到方法参数。
无特定属性。
public class MessagePayloadHandler {
public void handlePayload(@Payload MyPayload payload) {
// 使用payload中的数据
processData(payload.getData());
}
private void processData(MyData data) {
// 数据处理逻辑
}
}
复制代码
@Header
@Header
注解用于方法参数上,将消息头绑定到方法参数。
注解属性介绍
value
: 指定要绑定的头的名称。
注解业务案例
public class MessageHeaderHandler {
public void handleMessage(@Header("messageType") String messageType) {
// 根据消息类型进行处理
handleBasedOnType(messageType);
}
private void handleBasedOnType(String type) {
// 类型特定处理逻辑
}
}
复制代码
7. 消息处理异常处理
@MessageExceptionHandler
@MessageExceptionHandler
注解用于方法上,处理消息处理过程中发生的异常。
无特定属性。
public class MessageExceptionHandler {
@MessageExceptionHandler
public void handleException(Exception ex) {
// 记录日志或执行其他异常处理操作
logError(ex);
}
private void logError(Exception ex) {
// 错误记录逻辑
}
}
复制代码
8. 事件订阅
@SubscribeEvent
@SubscribeEvent
注解用于方法上,标记方法订阅特定的事件或消息。
无特定属性。
public class EventSubscriber {
@SubscribeEvent
public void handleCustomEvent(MyCustomEvent event) {
// 处理自定义事件
processEvent(event);
}
private void processEvent(MyCustomEvent event) {
// 事件处理逻辑
}
}
复制代码
9. 消息发送目的地配置
@SendTo
@SendTo
注解用于方法上,指定方法的返回值应该发送到的目的地,可以是消息队列或主题。
注解属性介绍
value
: 指定消息发送的目的地。
注解业务案例
public class MessageSender {
@SendTo("/topic/responses")
public String processAndSend(String input) {
// 处理消息并返回响应
String response = "Response to " + input;
// 发送响应到指定目的地
return response;
}
}
复制代码
10、综合性注解应用案例
1. 项目结构
OrderApplication.java
- Spring Boot 启动类
MessagingConfig.java
- Spring Messaging 配置类
OrderService.java
- 订单业务逻辑服务
OrderRepository.java
- 订单数据访问层
Order.java
- 订单实体
OrderEvent.java
- 订单事件
2. Spring Boot 启动类
@SpringBootApplication
@EnableJms
@EnableKafka
@EnableRabbit
public class OrderApplication {
public static void main(String[] args) {
SpringApplication.run(OrderApplication.class, args);
}
}
复制代码
3. Spring Messaging 配置类
@Configuration
public class MessagingConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic", "/queue");
registry.setApplicationDestinationPrefixes("/app");
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory(
ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrency("5");
return factory;
}
// Kafka和RabbitMQ的配置省略
}
复制代码
4. 订单业务逻辑服务
@Service
public class OrderService {
@Autowired
private OrderRepository orderRepository;
public void processOrderCreation(String orderDetails) {
Order order = new Order(orderDetails);
orderRepository.save(order);
// 发送订单创建事件
rabbitTemplate.convertAndSend("orderCreatedExchange", "order.created", order);
}
@JmsListener(destination = "orderQueue", containerFactory = "jmsListenerContainerFactory")
public void handleOrderMessage(Order order) {
// 处理接收到的订单消息
}
@MessageMapping("/order/update")
@SendTo("/topic/orders")
public Order updateOrder(@Payload Order order) {
// 更新订单信息
orderRepository.save(order);
return order;
}
@MessageExceptionHandler
public void handleMessagingException(Exception ex) {
// 记录异常信息
}
@SubscribeEvent
public void handleOrderEvent(OrderEvent event) {
// 处理订单事件
}
}
复制代码
5. 订单数据访问层
@Repository
public interface OrderRepository extends JpaRepository<Order, Long> {
@Modifying
@Transactional
void deleteByOrderDetails(String details);
}
复制代码
6. 订单实体
@Entity
@Table(name = "orders")
public class Order {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String orderDetails;
// 构造函数、getter 和 setter 省略
}
复制代码
7. 订单事件
public class OrderEvent {
private Order order;
// 构造函数、getter 和 setter 省略
}
复制代码
启动类 OrderApplication
通过 @SpringBootApplication
注解激活 Spring Boot 应用,并使用 @Enable*
注解启用了 JMS、Kafka 和 RabbitMQ 的支持。
MessagingConfig
类配置了 WebSocket 端点、消息代理和消息监听器容器工厂。
OrderService
类包含了业务逻辑,使用 @JmsListener
注解监听 JMS 队列,@MessageMapping
注解处理 STOMP 消息,并使用 @SendTo
注解发送消息到主题。同时,它还处理了消息异常和订阅了订单事件。
OrderRepository
接口继承了 JpaRepository
并添加了自定义的修改操作。
Order
类是一个简单的实体,使用 @Entity
和 @Table
注解映射到数据库。
OrderEvent
类用于封装订单事件数据。
评论