写点什么

SpringBoot 项目应该这样发送事件消息,很优雅!

作者:架构师之道
  • 2025-01-06
    湖南
  • 本文字数:6274 字

    阅读完需:约 21 分钟

在日常项目开发中,我们常常遇到类似的业务场景:“如果发生……,则……”“当完成……后,请通知……”“发生……时,需要……” 等。为了解耦业务流程并避免对主流程的影响,通常我们会采用领域事件的方式。在主业务流程完成后,发送一条消息,由消息监听者负责处理后续业务流程。最常见的实现方式是通过ApplicationEventPublisher发送消息,使用@EventListener注解来监听并处理相应的业务逻辑。

然而,在最近的代码走查中,我发现项目中发送消息的方式各不相同,问题颇多。有的同事直接将数据库 DO 对象作为消息体发送;有的则使用一个公共消息体对象,并通过类型进行区分,最终在消费逻辑中根据类型选择不同的处理逻辑。这样的实现方式存在很多问题。那么,我们该如何设计一个通用的领域事件发送流程呢?

本文将对此进行探讨。

基类设计

在领域事件的业务场景中,出现 “如果发生…,则…” 时,通常是领域对象发生了变化,常见的操作有创建更新删除

我们可以首先定义一个枚举类,表示事件变更的类型:

public enum EventTypeEnum {      DELETE,      CREATE,      UPDATE;      /**    * Acquires an EventTypeEnum by its name. * @param name    * @return    */    public static EventTypeEnum acquireByName(String name){          return Arrays.stream(EventTypeEnum.values())                  .filter(e -> Objects.equals(e.name(), name))                  .findFirst()                  .orElseThrow(() -> new RuntimeException(String.format(" this EventType can not support %s", name)));      }    }
复制代码

事件对象通常需要包含事件唯一编号、事件发生时间等基本属性,并且具有不可变性。为了统一时间的定义过程,我们可以对事件对象进行抽象,提炼出事件基类BaseEvent

@Getter  public abstract class BaseEvent implements Serializable {      private final String eventId;      private final LocalDateTime eventTime;        public BaseEvent() {          this.eventId = UUID.randomUUID().toString();          this.eventTime = LocalDateTime.now();      }  }
复制代码

在发送事件时,我们可以直接将对象发送。结合这一点,可以设计一个泛型包装类,代表领域事件的基类DomainEvent。在变更操作时,系统需要记录原始对象和变更后的对象。在DomainEvent中,可以使用sourceafter属性来表示这些对象,同时提供了一个buildContext方法用于构建对象变更说明,可用于记录日志。

@Getter  public class DomainEvent<T> extends BaseEvent{      private final T source;    private final T after;      private final EventTypeEnum eventType;          public DomainEvent(final T source, final T after, final EventTypeEnum eventType) {          this.eventType = eventType;          this.source = source;          this.after = after;      }          public String beforeSnapshot(){          return Objects.toString(source, "before unknown");      }        public String afterSnapshot() {          return Objects.toString(after, "after unknown");      }        /**       *  Builds the context of the event change.     */         public String buildContext() {           return String.format("%s changed(%s)[%s = > %s]", source.getClass().getTypeName(), eventTypeEnum.toString(), beforeSnapshot(), afterSnapshot());      }    }
复制代码

至此,我们完成了领域事件基类的设计,类图如下所示:

接口实现设计

在项目中,如果是进程内通信,可以基于ApplicationEventPublisher发布事件;而对于跨进程交互,则需要借助消息队列。无论何种实现方式,我们都应设计一个通用的领域事件发送接口。该接口提供onCreatedonUpdatedonDeleted方法,分别对应事件的创建、更新和删除逻辑。由于并非所有业务场景都需要实现这三种事件变更,这里我们使用default修饰符,以便业务按需实现。

public interface DataChangedEventPublisher<T> {    /**     * 处理新数据实体的创建。     */    default void onCreated(final T data){        DomainEvent<T> event = new DomainEvent<>(data, null, EventTypeEnum.CREATE);        publish(event);    }

    /**     * 处理数据实体的删除。     */    default void onDeleted(final T data){        DomainEvent<T> event = new DomainEvent<>(data, null, EventTypeEnum.DELETE);        publish(event);    }


    /**     * 处理数据实体的更新。     */    default void onUpdated(final T data, final T before){        DomainEvent<T> event = new DomainEvent<>(data, before, EventTypeEnum.UPDATE);        publish(event);    }

    /**     * Publishes the given DomainEvent.     */    void publish(DomainEvent<T> domainEvent);}
复制代码

消息发送示例

一旦定义了DataChangedEventPublisher接口,我们就可以实现消息发送逻辑。假设我们有一个业务场景:当用户创建或更新时需要发送消息。

  1. 定义用户对象

@Datapublic class Account {    private String name;    private String nickName;    private Long id;    private Integer age;}
复制代码
  1. 创建变更消息对象

public class AccountChangedEvent extends DomainEvent<Account> {
    public AccountChangedEvent(Account source, Account before, OperationType operationType) {        super(source, before, operationType);    }
    @Override    public String buildContext() {        final Account source = getSource();        if (Objects.isNull(getAfter())) {            return String.format("the account [%s] is %s", source.getName(), StringUtils.lowerCase(getOperationType().toString()));        }        return String.format("the account [%s] is %s : %s", source.getName(), StringUtils.lowerCase(getOperationType().toString()), contrast());    }

    /**     * 字段对比     */    private Object contrast() {        final Account before =  getSource();        Objects.requireNonNull(before);        final Account after =  getAfter();        Objects.requireNonNull(after);        if (Objects.equals(before, after)) {            return "it no change";        }        final StringBuilder builder = new StringBuilder();        if (!Objects.equals(before.getName(), after.getName())) {            builder.append(String.format("name[%s => %s] ", before.getName(), after.getName()));        }        if (!Objects.equals(before.getUniqueName(), after.getNickName())) {            builder.append(String.format("uniqueName[%s => %s] ", before.getUniqueName(), after.getUniqueName()));        }        if (!Objects.equals(before.getAge(), after.getAge())) {            builder.append(String.format("age[%s => %s] ", before.getAge(), after.getAge()));        }        if (!Objects.equals(before.getNickName(), after.getNickName())) {            builder.append(String.format("nickName[%s => %s] ", before.getNickName(), after.getNickName()));        }        return builder.toString();    }}
复制代码

重写buildContext()方法用于构建用户变更的详细信息,根据业务需要自由实现。

  1. 实现事件变更接口

@Slf4j@Servicepublic class AccountChangedPublisher implements DataChangedEventPublisher<Account> {
    private final ApplicationEventPublisher applicationEventPublisher;
    public AccountChangedPublisher(ApplicationEventPublisher applicationEventPublisher) {        this.applicationEventPublisher = applicationEventPublisher;    }
    @Override    public void onCreated(Account data) {        AccountChangedEvent changedEvent = new AccountChangedEvent(data, null, OperationType.CREATE);        publish(changedEvent);    }
    @Override    public void onUpdated(Account data, Account after) {        AccountChangedEvent changedEvent = new AccountChangedEvent(data, after, OperationType.UPDATE);        publish(changedEvent);    }

    @Override    public void publish(DomainEvent<Account> domainEvent) {        String context = domainEvent.buildContext();        log.info(context);        applicationEventPublisher.publishEvent(domainEvent);    }}
复制代码
  1. 注入并使用 Publisher 进行消息发送

业务逻辑中可以注入AccountChangedPublisher进行消息发送,同时监听AccountChangedEvent

@RestController@RequestMapping("/api/demo/account")@RequiredArgsConstructorpublic class AccountController {
    private final AccountChangedPublisher publisher;
    @GetMapping("/message")    public void demo() {        Account account = new Account();        account.setAge(23);        account.setName("Jam");        account.setNickName("张张");
        Account account2 = new Account();        account2.setNickName("赵赵");        account2.setAge(24);        account2.setName("Jam");
        //发送变更消息        publisher.onUpdated(account,account2);    }}
@Componentpublic class AccountEventConsumer {    @EventListener    public void handleSyncErrorEvent(AccountChangedEvent event) {        // Handle the event here        System.out.println("Received event: " + event.buildContext());    }}
复制代码
  1. 日志输出

当消息发送后,可以在日志中看到以下信息:

INFO  c.j.d.m.t.e.a.AccountChangedPublisher - the account [Jam] is update : age[23 => 24] nickName[张张 => 赵赵] Received event: the account [Jam] is update : age[23 => 24] nickName[张张 => 赵赵] 
复制代码

通过以上步骤,我们完成了事件消息的发送流程,整体类图如下所示:

设计扩展

该设计可以进行进一步扩展。如果系统要求存储所有发送过的消息以便后期检索和分析,我们可以创建一个抽象类AbstractLoggingEventPublisher,用于处理此逻辑,后续事件发送器可直接继承此类。


@Slf4jpublic abstract class AbstractLoggingEventPublisher<T> implements DataChangedEventPublisher<T>{    @Autowired    private LogService logService;
    protected AbstractLoggingEventPublisher(){
    }
    /**     * Publishes the given DomainEvent.     * Logs the event details and delegates the actual publishing to the subclass.     */    @Override    public void publish(DomainEvent<T> domainEvent) {        logEvent(domainEvent);        publishEvent(domainEvent);    }
    /**     * Logs the details of the given DomainEvent.     * @param domainEvent the event to be logged     */    private void logEvent(DomainEvent<T> domainEvent) {        logService.saveLog(domainEvent);    }

    /**     * Abstract method for publishing the given DomainEvent.     * @param domainEvent the event to be published     */    protected abstract void publishEvent(DomainEvent<T> domainEvent) ;}
复制代码

此时,AccountChangedPublisher可以直接继承AbstractLoggingEventPublisher,代码如下:

@Slf4j@Servicepublic class AccountChangedEventPublisher extends AbstractLoggingEventPublisher<Account> {
    private final ApplicationEventPublisher applicationEventPublisher;
    public AccountChangedEventPublisher3(ApplicationEventPublisher applicationEventPublisher) {        this.applicationEventPublisher = applicationEventPublisher;    }
    @Override    public void onUpdated(Account data, Account after) {        AccountChangedEvent changedEvent = new AccountChangedEvent(data, after, OperationType.UPDATE);        publish(changedEvent);    }        @Override    public void onCreated(Account data) {        AccountChangedEvent changedEvent = new AccountChangedEvent(data, null, OperationType.CREATE);        publish(changedEvent);    }

    @Override    protected void publishEvent(DomainEvent<Account> domainEvent) {        applicationEventPublisher.publishEvent(domainEvent);    }}
复制代码

总结

应用开发中,解耦业务流程至关重要,领域事件模式提供了一种有效的解决方案。通过设计通用的事件发布接口DataChangedEventPublisher,我们能够实现对创建、更新和删除等业务操作的统一处理。借助DomainEvent类封装事件信息,确保了事件的完整性和可追溯性。

用户头像

还未添加个人签名 2022-04-10 加入

还未添加个人简介

评论

发布
暂无评论
SpringBoot项目应该这样发送事件消息,很优雅!_Java_架构师之道_InfoQ写作社区