写点什么

单体架构中的事件驱动架构:Java 应用程序的渐进式重构

作者:码语者
  • 2025-10-27
    上海
  • 本文字数:14271 字

    阅读完需:约 47 分钟

单体架构中的事件驱动架构:Java应用程序的渐进式重构

传统观点认为事件驱动架构属于微服务架构范畴,服务通过消息代理进行异步通信。然而,事件驱动模式一些最具价值的应用恰恰发生在单体应用程序内部——在这些地方,紧密耦合已造成维护噩梦,而渐进式重构则提供了一条通往更好架构的路径,且无需分布式系统的运维复杂性。

为何在单体应用中使用事件有意义

传统的分层单体应用存在一个特定问题:直接的方法调用在组件之间创建了僵化的依赖关系。您的订单处理代码直接调用库存管理,库存管理又调用仓库系统,继而触发电子邮件通知。每个组件都了解其他几个组件,从而形成一个纠缠不清的网,更改其中一部分需要理解并测试它所触及的所有内容。


事件驱动模式引入了间接性。当下单时,订单服务发布一个"OrderPlaced"事件。其他对订单感兴趣的组件——库存、发货、通知——订阅此事件并独立响应。订单服务不知道也不关心谁在监听。即使这些组件存在于同一个代码库并共享同一个数据库,它们也变得松散耦合。


这种方法提供了立竿见影的好处,而无需将应用程序拆分为微服务。您在保持单体应用运维简单性的同时,获得了可测试性、灵活性和更清晰的边界。当您最终需要提取服务时,事件驱动的结构使得过渡更加平滑,因为组件已经通过定义良好的消息进行通信,而不是直接的方法调用。

起点:一个紧密耦合的订单系统

考虑一个使用 Spring Boot 构建的典型电子商务单体应用。订单创建流程如下所示:


@Service@Transactionalpublic class OrderService {    private final OrderRepository orderRepository;    private final InventoryService inventoryService;    private final PaymentService paymentService;    private final ShippingService shippingService;    private final LoyaltyService loyaltyService;    private final EmailService emailService;    private final AnalyticsService analyticsService;         public OrderService(        OrderRepository orderRepository,        InventoryService inventoryService,        PaymentService paymentService,        ShippingService shippingService,        LoyaltyService loyaltyService,        EmailService emailService,        AnalyticsService analyticsService    ) {        this.orderRepository = orderRepository;        this.inventoryService = inventoryService;        this.paymentService = paymentService;        this.shippingService = shippingService;        this.loyaltyService = loyaltyService;        this.emailService = emailService;        this.analyticsService = analyticsService;    }         public Order createOrder(CreateOrderRequest request) {        // 验证库存        for (OrderItem item : request.getItems()) {            if (!inventoryService.checkAvailability(item.getProductId(), item.getQuantity())) {                throw new InsufficientInventoryException(item.getProductId());            }        }                 // 处理支付        PaymentResult payment = paymentService.processPayment(            request.getCustomerId(),            calculateTotal(request.getItems()),            request.getPaymentDetails()        );                 if (!payment.isSuccessful()) {            throw new PaymentFailedException(payment.getErrorMessage());        }                 // 创建订单        Order order = new Order(            request.getCustomerId(),            request.getItems(),            payment.getTransactionId()        );        order.setStatus(OrderStatus.CONFIRMED);        Order savedOrder = orderRepository.save(order);                 // 预留库存        for (OrderItem item : request.getItems()) {            inventoryService.reserveInventory(item.getProductId(), item.getQuantity());        }                 // 创建发货单        shippingService.createShipment(savedOrder);                 // 更新忠诚度积分        loyaltyService.addPoints(            request.getCustomerId(),            calculateLoyaltyPoints(savedOrder)        );                 // 发送确认邮件        emailService.sendOrderConfirmation(savedOrder);                 // 跟踪分析        analyticsService.trackOrderPlaced(savedOrder);                 return savedOrder;    }}
复制代码


这段代码可以工作,但存在严重问题。OrderService 知道七个不同的服务。测试需要模拟所有这些服务。添加新的订单后操作意味着要修改此方法。如果电子邮件服务缓慢,订单创建就会变慢。如果分析跟踪失败,整个订单就会失败并回滚。


事务边界也是错误的。所有操作都在单个数据库事务中发生,这意味着即使电子邮件服务临时停机也会阻止订单创建。库存预留和发货单创建在事务上耦合,尽管它们在逻辑上是独立的操作。

引入 Spring 应用事件

Spring Framework 提供了一个内置的事件系统,在单个 JVM 内工作。默认情况下它是同步的,这使得它易于推理和调试。首先定义领域事件:


public abstract class DomainEvent {    private final Instant occurredAt;    private final String eventId;         protected DomainEvent() {        this.occurredAt = Instant.now();        this.eventId = UUID.randomUUID().toString();    }         public Instant getOccurredAt() {        return occurredAt;    }         public String getEventId() {        return eventId;    }} public class OrderPlacedEvent extends DomainEvent {    private final Long orderId;    private final Long customerId;    private final List<OrderItem> items;    private final BigDecimal totalAmount;         public OrderPlacedEvent(Order order) {        super();        this.orderId = order.getId();        this.customerId = order.getCustomerId();        this.items = new ArrayList<>(order.getItems());        this.totalAmount = order.getTotalAmount();    }         // Getters}
复制代码


事件应该是不可变的,并包含订阅者需要的所有信息。避免直接传递实体——而是复制相关数据。这可以防止订阅者意外修改共享状态。


重构 OrderService 以发布事件,而不是直接调用服务:


@Service@Transactionalpublic class OrderService {    private final OrderRepository orderRepository;    private final InventoryService inventoryService;    private final PaymentService paymentService;    private final ApplicationEventPublisher eventPublisher;         public OrderService(        OrderRepository orderRepository,        InventoryService inventoryService,        PaymentService paymentService,        ApplicationEventPublisher eventPublisher    ) {        this.orderRepository = orderRepository;        this.inventoryService = inventoryService;        this.paymentService = paymentService;        this.eventPublisher = eventPublisher;    }         public Order createOrder(CreateOrderRequest request) {        // 验证库存        for (OrderItem item : request.getItems()) {            if (!inventoryService.checkAvailability(item.getProductId(), item.getQuantity())) {                throw new InsufficientInventoryException(item.getProductId());            }        }                 // 处理支付        PaymentResult payment = paymentService.processPayment(            request.getCustomerId(),            calculateTotal(request.getItems()),            request.getPaymentDetails()        );                 if (!payment.isSuccessful()) {            throw new PaymentFailedException(payment.getErrorMessage());        }                 // 创建并保存订单        Order order = new Order(            request.getCustomerId(),            request.getItems(),            payment.getTransactionId()        );        order.setStatus(OrderStatus.CONFIRMED);        Order savedOrder = orderRepository.save(order);                 // 同步预留库存(仍在关键路径上)        for (OrderItem item : request.getItems()) {            inventoryService.reserveInventory(item.getProductId(), item.getQuantity());        }                 // 为非关键操作发布事件        eventPublisher.publishEvent(new OrderPlacedEvent(savedOrder));                 return savedOrder;    }}
复制代码


现在 OrderService 仅依赖四个组件,而不是八个。更重要的是,它只了解对订单创建至关重要的操作——库存验证、支付处理和库存预留。其他所有操作都通过事件发生。


为解耦的操作创建事件监听器:


@Componentpublic class OrderEventListeners {    private static final Logger logger = LoggerFactory.getLogger(OrderEventListeners.class);         private final ShippingService shippingService;    private final LoyaltyService loyaltyService;    private final EmailService emailService;    private final AnalyticsService analyticsService;         public OrderEventListeners(        ShippingService shippingService,        LoyaltyService loyaltyService,        EmailService emailService,        AnalyticsService analyticsService    ) {        this.shippingService = shippingService;        this.loyaltyService = loyaltyService;        this.emailService = emailService;        this.analyticsService = analyticsService;    }         @EventListener    @Transactional(propagation = Propagation.REQUIRES_NEW)    public void handleOrderPlaced(OrderPlacedEvent event) {        try {            shippingService.createShipment(event.getOrderId());        } catch (Exception e) {            logger.error("Failed to create shipment for order {}", event.getOrderId(), e);            // 不要重新抛出 - 其他监听器仍应执行        }    }         @EventListener    @Transactional(propagation = Propagation.REQUIRES_NEW)    public void updateLoyaltyPoints(OrderPlacedEvent event) {        try {            int points = calculatePoints(event.getTotalAmount());            loyaltyService.addPoints(event.getCustomerId(), points);        } catch (Exception e) {            logger.error("Failed to update loyalty points for order {}", event.getOrderId(), e);        }    }         @EventListener    public void sendConfirmationEmail(OrderPlacedEvent event) {        try {            emailService.sendOrderConfirmation(event.getOrderId());        } catch (Exception e) {            logger.error("Failed to send confirmation email for order {}", event.getOrderId(), e);        }    }         @EventListener    public void trackAnalytics(OrderPlacedEvent event) {        try {            analyticsService.trackOrderPlaced(event.getOrderId(), event.getTotalAmount());        } catch (Exception e) {            logger.error("Failed to track analytics for order {}", event.getOrderId(), e);        }    }}
复制代码


每个监听器在它自己的事务中运行(在适当的时候)并独立处理故障。如果发送电子邮件失败,发货单创建仍然会发生。即使分析跟踪抛出异常,订单创建事务也会成功提交。

理解事务边界

@Transactional(propagation = Propagation.REQUIRES_NEW) 注解至关重要。没有它,所有监听器都会参与订单创建事务。如果任何监听器失败,整个订单都会回滚——这正是我们试图避免的情况。


使用 REQUIRES_NEW,每个监听器都会启动一个新的事务。当监听器运行时,订单已经提交。这意味着:


  • 监听器无法阻止订单创建

  • 监听器故障不会回滚订单

  • 每个监听器的工作是独立原子性的


但这有一个权衡。如果监听器失败,订单存在但某些后处理没有发生。您需要处理这些部分故障的策略:


@EventListener@Transactional(propagation = Propagation.REQUIRES_NEW)public void handleOrderPlaced(OrderPlacedEvent event) {    try {        shippingService.createShipment(event.getOrderId());    } catch (Exception e) {        logger.error("Failed to create shipment for order {}", event.getOrderId(), e);                 // 记录失败以便重试        failedEventRepository.save(new FailedEvent(            event.getClass().getSimpleName(),            event.getEventId(),            "handleOrderPlaced",            e.getMessage()        ));    }}
复制代码


一个单独的后台作业可以重试失败的事件:


@Componentpublic class FailedEventRetryJob {    private final FailedEventRepository failedEventRepository;    private final ApplicationEventPublisher eventPublisher;         @Scheduled(fixedDelay = 60000) // 每分钟    public void retryFailedEvents() {        List failures = failedEventRepository.findRetryable();                 for (FailedEvent failure : failures) {            try {                // 重建并重新发布事件                DomainEvent event = reconstructEvent(failure);                eventPublisher.publishEvent(event);                                 failure.markRetried();                failedEventRepository.save(failure);            } catch (Exception e) {                logger.warn("Retry failed for event {}", failure.getEventId(), e);                failure.incrementRetryCount();                failedEventRepository.save(failure);            }        }    }}
复制代码


这种模式提供了最终一致性——系统可能暂时不一致,但通过重试自行恢复。

转向异步事件

Spring 的 @EventListener 默认是同步的。事件处理发生在发布事件的同一线程中,发布者等待所有监听器完成。这提供了强有力的保证,但限制了可扩展性。


通过启用异步支持并注解监听器来使监听器异步:


@Configuration@EnableAsyncpublic class AsyncConfig {    @Bean(name = "eventExecutor")    public Executor eventExecutor() {        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();        executor.setCorePoolSize(4);        executor.setMaxPoolSize(10);        executor.setQueueCapacity(100);        executor.setThreadNamePrefix("event-");        executor.initialize();        return executor;    }} @Componentpublic class OrderEventListeners {    // ... 依赖 ...         @Async("eventExecutor")    @EventListener    @Transactional(propagation = Propagation.REQUIRES_NEW)    public void handleOrderPlaced(OrderPlacedEvent event) {        shippingService.createShipment(event.getOrderId());    }         @Async("eventExecutor")    @EventListener    public void sendConfirmationEmail(OrderPlacedEvent event) {        emailService.sendOrderConfirmation(event.getOrderId());    }}
复制代码


使用 @AsynccreateOrder() 方法在发布事件后立即返回。监听器在线程池中并发执行。这显著提高了响应时间——订单创建不再等待电子邮件发送或分析跟踪。


但异步事件引入了新的复杂性。当监听器执行时,订单创建事务可能尚未提交。监听器可能尝试从数据库加载订单,但由于事务仍在进行中而找不到它。


Spring 提供了 @TransactionalEventListener 来处理这种情况:


@Componentpublic class OrderEventListeners {    @Async("eventExecutor")    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)    public void handleOrderPlaced(OrderPlacedEvent event) {        // 这仅在订单创建事务成功提交后运行        shippingService.createShipment(event.getOrderId());    }}
复制代码


AFTER_COMMIT 阶段确保监听器仅在发布事务成功提交后运行。如果订单创建失败并回滚,监听器永远不会执行。这可以防止处理实际上不存在的订单的事件。

实现事件存储

随着事件驱动架构的成熟,存储事件变得有价值。事件存储提供了审计日志,支持调试,并支持更复杂的模式,如事件溯源。


创建一个简单的事件存储:


@Entity@Table(name = "domain_events")public class StoredEvent {    @Id    @GeneratedValue(strategy = GenerationType.IDENTITY)    private Long id;         @Column(nullable = false)    private String eventId;         @Column(nullable = false)    private String eventType;         @Column(nullable = false, columnDefinition = "TEXT")    private String payload;         @Column(nullable = false)    private Instant occurredAt;         @Column(nullable = false)    private Instant storedAt;         @Column    private String aggregateId;         @Column    private String aggregateType;         // 构造器、getter、setter} @Repositorypublic interface StoredEventRepository extends JpaRepository<StoredEvent, Long> {    List<StoredEvent> findByAggregateIdOrderByOccurredAt(String aggregateId);    List<StoredEvent> findByEventType(String eventType);}
复制代码


拦截并存储所有领域事件:


@Componentpublic class EventStoreListener {    private final StoredEventRepository repository;    private final ObjectMapper objectMapper;         public EventStoreListener(StoredEventRepository repository, ObjectMapper objectMapper) {        this.repository = repository;        this.objectMapper = objectMapper;    }         @EventListener    @Order(Ordered.HIGHEST_PRECEDENCE) // 在其他监听器之前存储    @Transactional(propagation = Propagation.REQUIRES_NEW)    public void storeEvent(DomainEvent event) {        try {            StoredEvent stored = new StoredEvent();            stored.setEventId(event.getEventId());            stored.setEventType(event.getClass().getSimpleName());            stored.setPayload(objectMapper.writeValueAsString(event));            stored.setOccurredAt(event.getOccurredAt());            stored.setStoredAt(Instant.now());                         // 如果可用,提取聚合信息            if (event instanceof OrderPlacedEvent) {                OrderPlacedEvent orderEvent = (OrderPlacedEvent) event;                stored.setAggregateId(orderEvent.getOrderId().toString());                stored.setAggregateType("Order");            }                         repository.save(stored);        } catch (JsonProcessingException e) {            throw new EventStoreException("Failed to serialize event", e);        }    }}
复制代码


现在,每个领域事件在业务逻辑处理之前都会持久化。您可以通过重放事件来重建系统中发生的情况:


@Servicepublic class OrderHistoryService {    private final StoredEventRepository eventRepository;         public List<OrderEvent> getOrderHistory(Long orderId) {        List<StoredEvent> events = eventRepository.findByAggregateIdOrderByOccurredAt(            orderId.toString()        );                 return events.stream()            .map(this::deserializeEvent)            .collect(Collectors.toList());    }         private OrderEvent deserializeEvent(StoredEvent stored) {        // 根据事件类型反序列化        try {            Class<?> eventClass = Class.forName("com.example.events." + stored.getEventType());            return (OrderEvent) objectMapper.readValue(stored.getPayload(), eventClass);        } catch (Exception e) {            throw new EventStoreException("Failed to deserialize event", e);        }    }}
复制代码


这实现了强大的调试能力。当客户报告其订单问题时,您可以准确看到发生了什么事件以及发生的顺序。

Saga 和补偿操作

某些工作流需要跨多个步骤进行协调,其中每个步骤都可能失败。传统方法使用分布式事务,但这些方法扩展性不佳且增加了复杂性。Saga 使用编排事件和补偿操作提供了一种替代方案。


考虑一个更复杂的订单流程,您需要:


  1. 预留库存

  2. 处理支付

  3. 创建发货单


如果在预留库存后支付失败,您需要释放预留。通过补偿事件实现这一点:


public class InventoryReservedEvent extends DomainEvent {    private final Long orderId;    private final List<ReservationDetail> reservations;         // 构造器、getter} public class PaymentFailedEvent extends DomainEvent {    private final Long orderId;    private final String reason;         // 构造器、getter} @Componentpublic class InventorySagaHandler {    private final InventoryService inventoryService;         @EventListener    public void handlePaymentFailed(PaymentFailedEvent event) {        // 补偿操作:释放预留库存        inventoryService.releaseReservation(event.getOrderId());    }}
复制代码


Saga 通过事件而不是中央协调器进行协调:


@Servicepublic class OrderSagaService {    private final ApplicationEventPublisher eventPublisher;    private final InventoryService inventoryService;    private final PaymentService paymentService;         public void processOrder(Order order) {        // 步骤 1: 预留库存        List<ReservationDetail> reservations = inventoryService.reserve(order.getItems());        eventPublisher.publishEvent(new InventoryReservedEvent(order.getId(), reservations));                 try {            // 步骤 2: 处理支付            PaymentResult payment = paymentService.processPayment(order);                         if (payment.isSuccessful()) {                eventPublisher.publishEvent(new PaymentSucceededEvent(order.getId(), payment));            } else {                // 触发补偿                eventPublisher.publishEvent(new PaymentFailedEvent(order.getId(), payment.getReason()));                throw new PaymentException(payment.getReason());            }        } catch (Exception e) {            // 触发补偿            eventPublisher.publishEvent(new PaymentFailedEvent(order.getId(), e.getMessage()));            throw e;        }    }}
复制代码


这种模式在没有分布式事务的情况下保持了一致性。每个步骤发布记录所发生事件的事件。当发生故障时,补偿事件会触发撤销先前步骤的操作。

桥接到外部消息代理

随着单体应用的增长,您可能希望与外部系统集成或为最终的服务提取做准备。Spring Cloud Stream 提供了对 RabbitMQ 或 Kafka 等消息代理的抽象,同时保持相同的事件驱动模式:


<dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-stream</artifactId></dependency><dependency>    <groupId>org.springframework.cloud</groupId>    <artifactId>spring-cloud-stream-binder-kafka</artifactId></dependency>
复制代码


application.yml 中配置绑定:


spring:  cloud:    stream:      bindings:        orderPlaced-out-0:          destination: order.placed        orderPlaced-in-0:          destination: order.placed          group: order-processors      kafka:        binder:          brokers: localhost:9092
复制代码


创建内部事件和外部消息之间的桥接:


@Componentpublic class EventPublisher {    private final StreamBridge streamBridge;         public EventPublisher(StreamBridge streamBridge) {        this.streamBridge = streamBridge;    }         @EventListener    public void publishToExternalBroker(OrderPlacedEvent event) {        // 将内部事件发布到外部消息代理        streamBridge.send("orderPlaced-out-0", event);    }} @Componentpublic class ExternalEventConsumer {    private final ApplicationEventPublisher eventPublisher;         public ExternalEventConsumer(ApplicationEventPublisher eventPublisher) {        this.eventPublisher = eventPublisher;    }         @Bean    public Consumer<OrderPlacedEvent> orderPlaced() {        return event -> {            // 将外部事件重新发布为内部事件            eventPublisher.publishEvent(event);        };    }}
复制代码


这种模式让您可以选择性地将事件发布到外部,同时将内部事件保留在本地。关键的实时操作使用内部事件以实现低延迟。跨服务通信使用消息代理以实现可靠性和可扩展性。

监控与可观测性

事件驱动系统引入了新的可观测性挑战。理解正在发生的情况需要跨多个异步处理步骤跟踪事件。实施全面的日志记录和指标:


@Aspect@Componentpublic class EventMonitoringAspect {    private static final Logger logger = LoggerFactory.getLogger(EventMonitoringAspect.class);    private final MeterRegistry meterRegistry;         public EventMonitoringAspect(MeterRegistry meterRegistry) {        this.meterRegistry = meterRegistry;    }         @Around("@annotation(org.springframework.context.event.EventListener)")    public Object monitorEventListener(ProceedingJoinPoint joinPoint) throws Throwable {        String listenerName = joinPoint.getSignature().getName();        Object[] args = joinPoint.getArgs();        DomainEvent event = (DomainEvent) args[0];                 Timer.Sample sample = Timer.start(meterRegistry);                 try {            logger.info("Processing event {} in listener {}",                 event.getEventId(), listenerName);                         Object result = joinPoint.proceed();                         sample.stop(Timer.builder("event.listener.duration")                .tag("listener", listenerName)                .tag("event_type", event.getClass().getSimpleName())                .tag("status", "success")                .register(meterRegistry));                         meterRegistry.counter("event.listener.processed",                "listener", listenerName,                "event_type", event.getClass().getSimpleName(),                "status", "success"            ).increment();                         return result;        } catch (Exception e) {            sample.stop(Timer.builder("event.listener.duration")                .tag("listener", listenerName)                .tag("event_type", event.getClass().getSimpleName())                .tag("status", "failure")                .register(meterRegistry));                         meterRegistry.counter("event.listener.processed",                "listener", listenerName,                "event_type", event.getClass().getSimpleName(),                "status", "failure"            ).increment();                         logger.error("Error processing event {} in listener {}",                 event.getEventId(), listenerName, e);                         throw e;        }    }}
复制代码


这个切面自动跟踪每个事件监听器的执行时间和成功率。结合 Prometheus 和 Grafana 等工具,您可以可视化事件处理模式并识别瓶颈。


添加关联 ID 以跟踪系统中的事件:


public abstract class DomainEvent {    private final Instant occurredAt;    private final String eventId;    private final String correlationId;         protected DomainEvent(String correlationId) {        this.occurredAt = Instant.now();        this.eventId = UUID.randomUUID().toString();        this.correlationId = correlationId != null ? correlationId : UUID.randomUUID().toString();    }         // Getters}
复制代码


通过事件链传播关联 ID:


@EventListenerpublic void handleOrderPlaced(OrderPlacedEvent event) {    MDC.put("correlationId", event.getCorrelationId());         try {        // 执行工作                 // 发布具有相同关联 ID 的后续事件        eventPublisher.publishEvent(new ShipmentCreatedEvent(            event.getOrderId(),            event.getCorrelationId()        ));    } finally {        MDC.clear();    }}
复制代码


现在,与单个订单流相关的所有日志消息共享一个关联 ID,使得跨多个异步操作跟踪整个工作流变得微不足道。

测试事件驱动代码

事件驱动架构需要不同的测试策略。传统的单元测试适用于单个监听器,但集成测试对于验证事件流变得更加重要:


@SpringBootTest@TestConfigurationpublic class OrderEventIntegrationTest {    @Autowired    private ApplicationEventPublisher eventPublisher;         @Autowired    private ShippingService shippingService;         @Autowired    private EmailService emailService;         @Test    public void shouldProcessOrderPlacedEventCompletely() throws Exception {        // 给定        Order order = createTestOrder();        OrderPlacedEvent event = new OrderPlacedEvent(order);                 // 当        eventPublisher.publishEvent(event);                 // 等待异步处理        await().atMost(5, TimeUnit.SECONDS).untilAsserted(() -> {            // 然后            verify(shippingService).createShipment(order.getId());            verify(emailService).sendOrderConfirmation(order.getId());        });    }}
复制代码


对于单元测试,注入一个间谍事件发布器以验证事件是否正确发布:


@ExtendWith(MockitoExtension.class)public class OrderServiceTest {    @Mock    private OrderRepository orderRepository;         @Mock    private InventoryService inventoryService;         @Mock    private PaymentService paymentService;         @Spy    private ApplicationEventPublisher eventPublisher = new SimpleApplicationEventPublisher();         @InjectMocks    private OrderService orderService;         @Test    public void shouldPublishOrderPlacedEventAfterCreatingOrder() {        // 给定        CreateOrderRequest request = createValidRequest();                 when(inventoryService.checkAvailability(any(), anyInt())).thenReturn(true);        when(paymentService.processPayment(any(), any(), any()))            .thenReturn(PaymentResult.successful("txn-123"));        when(orderRepository.save(any())).thenAnswer(inv -> inv.getArgument(0));                 // 当        orderService.createOrder(request);                 // 然后        verify(eventPublisher).publishEvent(argThat(event ->             event instanceof OrderPlacedEvent        ));    }}
复制代码

迁移之旅

将单体应用重构为使用事件驱动架构并非全有或全无的命题。从一个工作流开始——通常是造成最多痛苦的那个。识别可以事件驱动的直接服务调用,并逐步引入事件。


从同步事件开始,以最小化行为变更。一旦事件正确流动,为非关键监听器切换到异步处理。当您需要审计跟踪或调试能力时,添加事件存储。仅当您需要跨服务通信或准备提取微服务时,才集成外部消息代理。


目标不是实现完美的事件驱动架构。而是减少耦合、提高可测试性,并在组件之间创建更清晰的边界。即使是部分采用也能提供价值——具有一些事件驱动模式的单体应用比完全没有的模式更易于维护。


这种渐进式方法使您能够持续交付价值,而不是投入一个需要数月时间、直到完全结束时才能交付任何成果的重构项目。您能够了解在特定领域和团队中哪些方法有效,根据实际经验而非理论理想来调整实施策略。

有用的资源

  • Spring 应用事件:https://docs.spring.io/spring-framework/reference/core/beans/context-introduction.html#context-functionality-events 关于应用事件系统以及如何有效使用它的官方 Spring 文档。

  • Spring Cloud Stream:https://spring.io/projects/spring-cloud-stream 用于构建事件驱动微服务的框架,在桥接到外部消息代理时很有用。

  • Eric Evans 的《领域驱动设计》:https://www.domainlanguage.com/ddd/理解事件驱动系统中的领域事件和限界上下文的必读材料。

  • 企业集成模式:https://www.enterpriseintegrationpatterns.com/适用于事件驱动架构的消息传递模式的全面目录。

  • Micrometer:https://micrometer.io/用于监控事件处理和系统行为的应用指标库。

  • Awaitility:https://github.com/awaitility/awaitility 用于异步系统的测试库,对于事件驱动代码的集成测试至关重要。




【注】本文译自: Event-Driven Architecture in Monoliths: Incremental Refactoring for Java Apps - Java Code Geeks

发布于: 刚刚阅读数: 5
用户头像

码语者

关注

分享程序人生。 2019-07-04 加入

“码”界老兵,分享程序人生。

评论

发布
暂无评论
单体架构中的事件驱动架构:Java应用程序的渐进式重构_Java_码语者_InfoQ写作社区