做变化的催化剂
Be a Catalyst for Change
你不能强迫人们改变。相反,要向他们展示未来可能会怎么样,并帮助他们参与未来的创造。
-- 提示 5 《程序员修炼之道 -- 从小工到专家》
写在前面
前段时间发现 Spring 的 Event 超级好用,所以已经逐步在项目中加入了 Spring Event 的功能。
开发环境:
Java 1.8Spring Boot 2.1.6.RELEASESpring 5.1.8.RELEASE
基本开发
Event 是 Spring 中的概念,不是 Spring Event 所有的。只要添加了spring-context
依赖就可以引入了 Spring 的事件。
要使用 Event 只要准备三个部分:
事件类:定义事件,继承ApplicationEvent
的类成为一个事件类。
发布者:发布事件,通过ApplicationEventPublisher
发布事件。
监听者:监听并处理事件,实现ApplicationListener
接口或者使用@EventListener
注解。
事件类
只要继承org.springframework.context.ApplicationEvent
,便是一个 Spring Event 类。一般我们会专门为一个类型的 Event 写一个抽象的事件类,作为该类型的所有事件的父类。
/**
* 账户相关的事件
*/
public abstract class AccountEvent extends ApplicationEvent {
/**
* 该类型事件携带的信息
*/
private AccountEventData eventData;
/**
*
* @param source 最初触发该事件的对象
* @param eventData 该类型事件携带的信息
*/
public AccountEvent(Object source, AccountEventData eventData) {
super(source);
this.eventData = eventData;
}
public AccountEventData getEventData() {
return eventData;
}
}
复制代码
然后定义具体的发布事件。这里推荐使用类实现的方式来发布具体的事件,而不是在事件中使用private String eventType
定义事件的类型。使用具体的类表示具体的事件,监听器只要监听具体的事件类即可,而无需再做判断,同时也不需要再另外维护事件类型列表。
public class AccountCreatedEvent extends AccountEvent {
public AccountCreatedEvent(Object source, AccountEventData eventData) {
super(source, eventData);
}
}
复制代码
还有一种实践是,利用泛型定义一个统一的父类。
public abstract class BaseEvent<T> extends ApplicationEvent {
/**
* 该类型事件携带的信息
*/
private T eventData;
/**
*
* @param source 最初触发该事件的对象
* @param eventData 该类型事件携带的信息
*/
public BaseEvent(Object source, T eventData) {
super(source);
this.eventData = eventData;
}
public T getEventData() {
return eventData;
}
}
复制代码
然后再指定事件的泛型类型即可。
public class AccountCreatedEvent extends BaseEvent<AccountEventData> {
public AccountCreatedEvent(Object source, AccountEventData eventData) {
super(source, eventData);
}
}
public class TodoCreatedEvent extends BaseEvent<TodoEventData> {
public TodoCreatedEvent(Object source, TodoEventData eventData) {
super(source, eventData);
}
}
复制代码
以上均使用了一个AccountEventData
,这是为了方便拓展,如果后续需要给事件增加新的字段,可以直接在该类上增加即可,而无需修改所有的子事件类。
发布者
发布者负责发布消息,有三种实现方式。Spring 容器中默认的ApplicationEventPublisher
是AbstractApplicationContext
,同时AbstractApplicationContext
也是ApplicationContext
的一个子类,也就是说,Spring 默认使用AbstractApplicationContext
发布事件。
方式 1:直接使用ApplicationEventPublisher
(推荐)
import org.springframework.context.ApplicationEventPublisher;
public class AccountsController {
@PostMapping("")
public Account createAccount(@RequestBody Account account) {
...
publisher.publishEvent(new AccountCreatedEvent(this, new AccountEventData()));
return account;
}
}
复制代码
方式 2:实现ApplicationEventPublisherAware
接口(推荐)
public interface ApplicationEventPublisherAware extends Aware {
void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher);
}
复制代码
这个方式其实就是注入一个ApplicationEventPublisher
,然后再用ApplicationEventPublisher#publisheEvent(ApplicationEvent)
方法发布事件。
package org.springframework.data.rest.webmvc;
@RepositoryRestController
class RepositoryEntityController extends AbstractRepositoryRestController implements ApplicationEventPublisherAware {
private ApplicationEventPublisher publisher;
@Override
public void setApplicationEventPublisher(ApplicationEventPublisher publisher) {
this.publisher = publisher;
}
private ResponseEntity<ResourceSupport> saveAndReturn(Object domainObject, RepositoryInvoker invoker,
HttpMethod httpMethod, PersistentEntityResourceAssembler assembler, boolean returnBody) {
publisher.publishEvent(new BeforeSaveEvent(domainObject));
Object obj = invoker.invokeSave(domainObject);
publisher.publishEvent(new AfterSaveEvent(obj));
...
}
...
}
复制代码
如果你希望你的 Service 类能够发布事件,可以实现这个接口ApplicationEventPublisherAware
。
方式 3:使用ApplicationContext#publishEvent(ApplicationEvent)
发布。
public class AccountEventPublisher {
private final ApplicationContext applicationContext;
public AccountEventPublisher(ApplicationContext context) {
this.applicationContext = context;
}
public void publish(TodoEvent ev) {
applicationContext.publishEvent(ev);
}
}
复制代码
ApplicationContext
是ApplicationEventPublisher
的一个实现,在有前面的两种方案之后,其实就不需要这个重复封装实现方案了。当然,你也可以直接使用ApplicationContext
。
监听器
监听器负责接收和处理事件。
基本用法
有两种实现方法:实现ApplicationListener
接口或者使用@EventListener
注解。
实现ApplicationListener
接口:
public class TodoFinishedListener implements ApplicationListener<TodoEvent.TodoFinishedEvent> { @Override public void onApplicationEvent(TodoEvent.TodoFinishedEvent event) { // do something }}
复制代码
使用@EventListener
注解(推荐)
@Slf4j@Componentpublic class SyncAccountListener { /** * 异步发送邮件 * @param event */ @EventListener public void doOnNormalEvent(NormalAccountEvent event) { try { log.debug("befor"); Thread.sleep(1000); log.debug("after"); } catch (InterruptedException e) { log.error(e.getMessage(), e); } }}
复制代码
可以使用@EventListener
在同一个类中用不同的方法监听多个不同的事件。相对于实现ApplicationListener
接口,使用@EventListener
会更加灵活。
@EventListener
注解的说明
@EventListener(value = {AccountCreatedEvent.class, AccountUpdatedEvent.class}, condition = "#event.account.age > 10")public void doSomethingOnAccountEvent(AccountEvent event) { // TODO}
复制代码
value
: 监听的事件(组),用于支持同一父类的事件
class
: 同 value
condition
: SpEL,使得 Event Handler 变得 conditional
#root.event
, Application 的引用
#root.args
, 表示方法参数,#root.args[0]表示第 0 个方法参数
#<name>
, 如上面代码中的 #event 表示以参数名关联参数
ApplicationEventMulticaster 事件广播器
事件广播器负责将ApplicationEventPublisher
发布的事件广播给所有的监听器。如果没有提供事件广播器,Spring 会自动使用SimpleApplicationEventMulticaster
作为默认的事件广播器。
构建事件基础
AbstractApplicationContext.java 中的refresh()
方法中构建了完整的事件基础。AbstractApplicationContext#initApplicationEventMulticaster()
初始化了事件广播器,AbstractApplicationContext#registerListeners()
则负责添加 Spring 容器中的事件监听器。
/**
* Initialize the ApplicationEventMulticaster.
* Uses SimpleApplicationEventMulticaster if none defined in the context.
* @see org.springframework.context.event.SimpleApplicationEventMulticaster
*/
protected void initApplicationEventMulticaster() {
ConfigurableListableBeanFactory beanFactory = getBeanFactory();
if (beanFactory.containsLocalBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME)) {
this.applicationEventMulticaster =
beanFactory.getBean(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, ApplicationEventMulticaster.class);
if (logger.isTraceEnabled()) {
logger.trace("Using ApplicationEventMulticaster [" + this.applicationEventMulticaster + "]");
}
}
else {
this.applicationEventMulticaster = new SimpleApplicationEventMulticaster(beanFactory);
beanFactory.registerSingleton(APPLICATION_EVENT_MULTICASTER_BEAN_NAME, this.applicationEventMulticaster);
if (logger.isTraceEnabled()) {
logger.trace("No '" + APPLICATION_EVENT_MULTICASTER_BEAN_NAME + "' bean, using " +
"[" + this.applicationEventMulticaster.getClass().getSimpleName() + "]");
}
}
}
复制代码
/**
* Add beans that implement ApplicationListener as listeners.
* Doesn't affect other listeners, which can be added without being beans.
*/
protected void registerListeners() {
// Register statically specified listeners first.
for (ApplicationListener<?> listener : getApplicationListeners()) {
getApplicationEventMulticaster().addApplicationListener(listener);
}
// Do not initialize FactoryBeans here: We need to leave all regular beans
// uninitialized to let post-processors apply to them!
String[] listenerBeanNames = getBeanNamesForType(ApplicationListener.class, true, false);
for (String listenerBeanName : listenerBeanNames) {
getApplicationEventMulticaster().addApplicationListenerBean(listenerBeanName);
}
// Publish early application events now that we finally have a multicaster...
Set<ApplicationEvent> earlyEventsToProcess = this.earlyApplicationEvents;
this.earlyApplicationEvents = null;
if (earlyEventsToProcess != null) {
for (ApplicationEvent earlyEvent : earlyEventsToProcess) {
getApplicationEventMulticaster().multicastEvent(earlyEvent);
}
}
}
复制代码
事件发布
我们可以在AbstractApplicationContext.java
中找到事件的直接发布方法。AbstractApplicationContext#publish(Object, ResolvableType)
中,事件的发布是通过ApplicationEventMulticaster
做的广播发布。
protected void publishEvent(Object event, @Nullable ResolvableType eventType) {
Assert.notNull(event, "Event must not be null");
// Decorate event as an ApplicationEvent if necessary
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent) {
applicationEvent = (ApplicationEvent) event;
}
else {
applicationEvent = new PayloadApplicationEvent<>(this, event);
if (eventType == null) {
eventType = ((PayloadApplicationEvent<?>) applicationEvent).getResolvableType();
}
}
// Multicast right now if possible - or lazily once the multicaster is initialized
if (this.earlyApplicationEvents != null) {
this.earlyApplicationEvents.add(applicationEvent);
}
else {
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
}
// Publish event via parent context as well...
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext) {
((AbstractApplicationContext) this.parent).publishEvent(event, eventType);
}
else {
this.parent.publishEvent(event);
}
}
}
复制代码
在前面我们可以知道,SimpleApplicationEventMulticaster
是ApplicationEventMulticaster
在 Spring 容器中的默认实现。理所当然地可以从中找到事件发布的真实方式。multicastEvent
方法会找到监听当前事件的所有监听器,然后再执行执行监听方法。
SimpleApplicationEventMulticaster
中有两个属性,Executor taskExecutor
和ErrorHandler errorHandler
。前者可以定义所有监听器是否异步执行,默认为 null,等价于同步执行的SyncTaskExecutor
,你也可以使用SimpleAsyncTaskExecutor
将所有监听器设置为异步执行。但这里有一点非常重要,如果你设置了 Executor 为异步的,那么所有的监听器都会异步执行,监听器和调用类会处于不同的上下文,不同的事务中,除非你有办法让 TaskExecutor 支持。其实我们完全不用通过修改广播器 taskExecutor 的方式来让监听器异步,可以通过@EnableAsync
启动异步,并@Async
将监听器设置为异步执行。通过@Async
的方式,可以自由地决定任意一个监听器是否为异步,而非暴力地让所有的监听器都异步化。
public void multicastEvent(final ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor();
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null) {
executor.execute(() -> invokeListener(listener, event));
}
else {
invokeListener(listener, event);
}
}
}
复制代码
而ErrorHandler errorHandler
则定义了在监听器发生异常之后的行为,在这里你可以看到,如果没有定义errorHandler
的话,会直接抛到上一层。
protected void invokeListener(ApplicationListener<?> listener, ApplicationEvent event) {
ErrorHandler errorHandler = getErrorHandler();
if (errorHandler != null) {
try {
doInvokeListener(listener, event);
}
catch (Throwable err) {
errorHandler.handleError(err);
}
}
else {
doInvokeListener(listener, event);
}
}
private void doInvokeListener(ApplicationListener listener, ApplicationEvent event) {
try {
listener.onApplicationEvent(event);
}
catch (ClassCastException ex) {
String msg = ex.getMessage();
if (msg == null || matchesClassCastMessage(msg, event.getClass())) {
// Possibly a lambda-defined listener which we could not resolve the generic event type for
// -> let's suppress the exception and just log a debug message.
Log logger = LogFactory.getLog(getClass());
if (logger.isTraceEnabled()) {
logger.trace("Non-matching event type for listener: " + listener, ex);
}
}
else {
throw ex;
}
}
}
复制代码
可以修改默认的ApplicationEventMulticaster
,或者直接继承/实现AbstractApplicationEventMulticaster/ApplicationEventMulticaster
。
// 是谁,是谁让我所有的监听器都编程异步了。-》原来是有人修改了事件广播器的taskExecutor为异步的了。public class AsyncConfig { @Bean public ApplicationEventMulticaster simpleApplicationEventMulticaster() { SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster(); eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor()); return eventMulticaster; }}
复制代码
同步
Listener 默认都是同步的
public Account createAccount(Account account) { ... try { publisher.publishEvent(new ThrowExceptionAccountEvent(this, new AccountEventData())); } catch (Exception e) { // 捕获同步Listener抛出的异常 throw new ServiceException(e.getMessage(), e); } ... return account;}
复制代码
以上的方法中,publishEvent
会执行完所有的同步监听器之后才返回。既然是同步,那么你可以通过@Order
注解来指定执行顺序。使用同步的监听器,可以让事件参与到 publisher 所在的事务中。从上面对ApplicationEventMulticaster
的讲解中可以知道,同步的执行其实就是简单的方法调用罢了。
异常与事务
上面已经讲到,同步的 listener 如果发生异常,而且没有被 ErrorHandler 拦截的话,是会往上抛出的,可以直接在 publishEvent 方法调用处捕获。
在同步的场景下,listener 的执行,其实就是普通方法的调用。那么事务的控制也是和普通的方法调用是一样的。如果想要让监听器在事务中,那么就在监听器方法上添加事务注解@Transational
就可以了。具体的分析见 Spring 的事务传播行为。
异步
既然前面讲到,监听器的执行其实就是一个普通方法的执行,那么将监听器声明为异步的方法也会和将一个普通方法声明为异步的方法一样,使用@Async
。
需要明确的一点是,当监听器设置为异步之后,会导致该监听器方法和调用 publishEvent 的方法处于不同的事务中。其实就和普通异步没有太多区别啦。
使用 @Async 实现异步
启动异步
@EnableAsync@Configurationpublic class AsyncConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(10); executor.setMaxPoolSize(20); executor.setQueueCapacity(1000); executor.setKeepAliveSeconds(300); executor.setThreadNamePrefix("dspk-Executor-"); // 拒绝策略 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } /** * 异常处理器 */ @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new SimpleAsyncUncaughtExceptionHandler(); }}
复制代码
默认的AsyncConfigurer
是AsyncConfigurerSupport
,两个方法均返回了 null。
设置一个监听器为异步
@Componentpublic class AccountListener { @Async @EventListener public void sendEmailOnAccountCreatedEvent(AccountCreateEvent event) { // 发送邮件 // do something else }}
复制代码
使用 ApplicationEventMulticaster 实现异步
为ApplicationEventMulticaster
指定一个异步的 taskExecutor,将会让所有的监听器都变成异步执行。真心不推荐这种做法。
public class AsyncConfig { @Bean public ApplicationEventMulticaster simpleApplicationEventMulticaster() { SimpleApplicationEventMulticaster eventMulticaster = new SimpleApplicationEventMulticaster(); eventMulticaster.setTaskExecutor(new SimpleAsyncTaskExecutor()); return eventMulticaster; }}
复制代码
事件事务管理 @TransactionalEventListener
定义
使用@TransactionalEventListener
是@EventListener
的拓展,可以指定监听器和发布事件的方法的事务隔离级别。隔离级别确保数据的有效性。@TransactionalEventListener
注解会将监听器声明为一个事务管理器(这部分有机会会在其他文章中说明)。
当一个监听器方法被@TransactionalEventListener
注解时,那么这个监听器只会在调用方为事务方法时执行,如果调用方是非事务方法,则无法该监听器不会被通知。值得注意的是,虽然@TransactionalEventListener
带有 Transaction 关键字,但这个方法并没有声明监听器为 Transactional 的。
@TransactionalEventListenerpublic void afterRegisterSendMail(MessageEvent event) { mailService.send(event);}
复制代码
配置 @TransactionalEventListener
除了含有与@EventListener
相同的属性(classes, condition),该注解还提供了另外的两个属性fallbackExecution
和phase
。
@TransactionalEventListener 注解属性:fallbackExecution
定义:fallbackExecution
设置 Listener 是否要在没有事务的情况下处理 event。
默认为 false,表示当 publishEvent 所在方法没有事务控制时,该监听器不监听事件。通过设置fallbackExecution=true
,可以让 Listener 在任何情况都可以执行。
@Transactional
public void txMethod() {
publisher.publishEvent(new MessageEvent());
}
public void nonTxMethod() {
publisher.publishEvent(new MessageEvent());
}
// txMethod: 执行
// nonTxMethod: 不执行
@TransactionalEventListener
public void afterRegisterSendMail(MessageEvent event) {
mailService.send(event);
}
// txMethod: 执行
// nonTxMethod: 执行
@TransactionalEventListener(fallbackExecution = true)
public void afterRegisterSendMail(MessageEvent event) {
mailService.send(event);
}
复制代码
@TransactionalEventListener 注解属性:phase
AFTER_COMMIT
- (default) 在事务完成之后触发事件。此时事务已经结束,监听器方法将找不到上一个事务
AFTER_ROLLBACK
– 在事务回滚之后触发事件
AFTER_COMPLETION
– 在事务完成之后触发事件 (含有 AFTER_COMMIT
和AFTER_ROLLBACK
)
BEFORE_COMMIT
- 在事务提交之前触发事件,此时调用方方法的事务还存在,监听器方法可以找到该事务
当你尝试在@TransactionEventListener
方法中执行 JPA 的 save 方法时,你会得到如下错误:
@EventListener
@TransactionalEventListener
public void doOnNormalEvent3(NormalAccountEvent event) {
Account account = new Account();
account.setPassword("33333");
account.setFirstName("333");
account.setLastName("333");
account.setEmail("33333@gr.com");
accountRepository.save(account);
}
复制代码
Participating transaction failed - marking existing transaction as rollback-only
Setting JPA transaction on EntityManager [SessionImpl(1991443937<open>)] rollback-only
...
org.springframework.dao.InvalidDataAccessApiUsageException: no transaction is in progress; nested exception is javax.persistence.TransactionRequiredException: no transaction is in progress
复制代码
原因是@TransactionalEventListener
默认是AFTER_COMMIT
,也就是当前事务已经结束了,所以无法找到所在事务,只能执行 rollback,因而无法成功将数据保存到数据库中。但如果你希望执行 findAll()方法,那么会拿到调用方提交到数据库中的数据,但也拿不到该 Listener 中 save 的数据。也许你会想,我在这个方法上@Transactional
不就可以了吗?目前的测试结果是也不行。具体原因会在写事务的文章中再另外讲解,这里暂不拓展。可以通过设置 phase 为TransactionPhase.BEFORE_COMMIT
进行解决,这样的话调用方的事务就还没有结束。
@Transactional
@TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.BEFORE_COMMIT)
public void doOnNormalEvent3(NormalAccountEvent event) {
...
}
复制代码
但这并不表示我们不应该使用 AFTER_COMMIT
,而是应该在合适的场景下使用。比如需要发送邮件。在下面的方法中,方法 B 会触发发送邮件,但由于方法 A 中还有操作,这些操作可能导致方法 B 回退,为了防止邮件提前发送(无法撤销操作),因此可以让发送邮件在事务提交之后再执行。
methodA() {
methodB();
// do something
}
methodB() {
publisher.publishEvent(new EventB());
}
@TransactionalEventListener
sendEmailOnEventB() {
// send Email
}
复制代码
参考
评论