MASA Framework 事件总线 - 进程内事件总线
概述
事件总线是一种事件发布/订阅结构,通过发布订阅模式可以解耦不同架构层级,同样它也可以来解决业务之间的耦合,它有以下优点
松耦合
横切关注点
可测试性
事件驱动
发布订阅模式
通过下图我们可以快速了解发布订阅模式的本质
订阅者将自己关心的事件在调度中心进行注册
事件的发布者通过调度中心把事件发布出去
订阅者收到自己关心的事件变更并执行相对应业务
其中发布者无需知道订阅者是谁,订阅者彼此之间也互不认识,彼此之间互不干扰
事件总线类型
在 Masa Framework 中,将事件划分为
进程内事件 (Event)
本地事件,它的发布与订阅需要在同一个进程中,订阅方与发布方需要在同一个项目中
跨进程事件 (IntegrationEvent)
集成事件,它的发布与订阅一定不在同一个进程中,订阅方与发布方可以在同一个项目中,也可以在不同的项目中
下面我们会用一个注册用户的例子来说明如何使用本地事件
入门
安装.NET 6.0
新建 ASP.NET Core 空项目
Assignment.InProcessEventBus
,并安装Masa.Contrib.Dispatcher.Events
注册 EventBus (用于发布本地事件), 修改
Program.cs
新增
RegisterUserEvent
类并继承Event
,用于发布注册用户事件
新增
注册用户
处理程序
在指定事件处理程序方法上增加特性 EventHandler,并在方法中增加参数 RegisterUserEvent
注册用户的处理程序可以放到任意一个类中,但其构造函数参数必须支持从 DI 获取,且处理程序的方法仅支持
Task
或Void
两种, 不支持其它类型
发送注册用户事件,修改
Program.cs
进阶
处理流程
EventBus 的 请求管道包含一系列请求委托,依次调用。 它们与ASP.NET Core中间件有异曲同工之妙,区别点在于中间件的执行顺序与注册顺序相反,最先注册的最后执行
每个委托均可在下一个委托前后执行操作,其中TransactionMiddleware
是 EventBus 发布后第一个要进入的中间件 (默认提供),并且它是不支持多次嵌套的。
EventBus 支持嵌套,这意味着我们可以在 Handler 中重新发布一个新的
Event
,但TransactionMiddleware
仅会在最外层进入时被触发一次
自定义中间件
根据需要我们可以自定义中间件,并注册到 EventBus 的请求管道中,比如通过增加FluentValidation
, 将参数验证从业务代码中剥离开来,从而使得处理程序更专注于业务
注册
FluentValidation
, 修改Program.cs
自定义验证中间件
ValidatorMiddleware.cs
,用于验证参数
注册 EventBus 并使用验证中间件
ValidatorMiddleware
添加注册用户验证类
RegisterUserEventValidator.cs
编排
EventBus 支持事件编排,它们可以用来处理一些对执行顺序有要求的业务,比如: 注册用户必须成功之后才可以发送注册邮件通知,发送奖励等等,那我们可以这样做
将注册用户业务拆分为三个 Handler,并通过指定 Order 的值来对执行事件排序
Saga
EventBus 支持 Saga 模式
具体是怎么做呢?
当发送奖励出现异常时,则执行补偿机制,执行顺序为 (2 - 1) > 0,由于目前仅存在一个 Order 为 1 的 Handler,则执行奖励补偿后退出
但对于部分不需要执行失败但不需要执行回退的方法,我们可以修改 FailureLevels
确保不会因为当前方法的异常而导致执行补偿机制
源码解读
EventHandler
FailureLevels: 失败级别, 默认: Throw
Throw:发生异常后,依次执行 Order 小于当前 Handler 的 Order 的取消动作,比如:Handler 顺序为 1、2、3,CancelHandler 为 1、2、3,如果执行 Handler3 异常,则依次执行 2、1
ThrowAndCancel:发生异常后,依次执行 Order 小于等于当前 Handler 的 Order 的取消动作,比如:Handler 顺序为 1、2、3,CancelHandler 为 1、2、3,如果执行 Handler3 异常,则依次执行 3、2、1
Ignore:发生异常后,忽略当前异常(不执行取消动作),继续执行其他 Handler
Order: 执行顺序,默认: int.MaxValue,用于控制当前方法的执行顺序
EnableRetry: 当 Handler 异常后是否启用重试, 默认: false
RetryTimes: 重试次数,当出现异常后执行多少次重试, 需开启重试配置
IsCancel: 是否是补偿机制,默认: false
Middleware<TEvent>
SupportRecursive: 是否支持递归 (嵌套), 默认: true
部分中间件仅在最外层被触发一次,像
TransactionMiddleware
就是如此,但也有很多中间件是需要被多次执行的,比如ValidatorMiddleware
,每次发布事件时都需要验证参数是否正确HandleAsync(TEvent @event, EventHandlerDelegate next): 处理程序,通过调用
next()
使得请求进入下一个Handler
IEventHandler<TEvent> 与 ISagaEventHandler<TEvent>
HandleAsync(TEvent @event): 提供事件的 Handler
CancelAsync(TEvent @event): 提供事件的补偿 Handler
与
EventHandler
功能类似,提供基本的 Handler 以及补偿 Handler,推荐使用EventHandler
的方式使用
TransactionMiddleware
提供事务中间件,当EventBus
与UoW
以及 Masa 提供的Repository
来使用时,当存在待提交的数据时,会自动执行保存并提交,当出现异常后,会执行事务回滚,无需担心脏数据入库
性能测试
与市面上使用较多的MeidatR
作了对比,结果如下图所示:
BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1023 (21H1/May2021Update)11th Gen Intel Core i7-11700 2.50GHz, 1 CPU, 16 logical and 8 physical cores.NET SDK=7.0.100-preview.4.22252.9[Host] : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT DEBUGJob-MHJZJL : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT
Runtime=.NET 6.0 IterationCount=100 RunStrategy=ColdStart
根据性能测试我们发现,EventBus 与 MediatR 性能差距很小,但 EventBus 提供的功能却要强大的多
常见问题
按照文档操作,通过
EventBus
发布事件后,对应的 Handler 并没有执行,也没有发现错误?
①. EventBus.PublishAsync(@event) 是异步方法,确保等待方法调用成功,检查是否出现同步方法调用异步方法的情况②. 注册EventBus
时指定程序集集合, Assembly 被用于注册时获取并保存事件与 Handler 的对应关系
程序集: 手动指定 Assembly 集合 -> MasaApp.GetAssemblies() -> AppDomain.CurrentDomain.GetAssemblies()
但由于 NetCore 按需加载,未使用的程序集在当前域中不存在,因此可能会导致部分事件以及 Handler 的对应关系未正确保存,因此可通过手动指定 Assembly 集合或者修改全局配置中的 Assembly 集合来修复这个问题
通过 EventBus 发布事件,Handler 出错,但数据依然保存到数据库中
①. 检查是否禁用事务
DisableRollbackOnFailure 是否为 true (是否失败时禁止回滚)
UseTransaction 是否为 false (禁止使用事务)
②. 检查当前数据库是否支持回滚。例如: 使用的是 Mysql 数据库,但回滚数据失败,请查看
本章源码
Assignment11
https://github.com/zhenlei520/MasaFramework.Practice
开源地址
MASA.Framework:https://github.com/masastack/MASA.Framework
如果你对我们的 MASA Framework 感兴趣,无论是代码贡献、使用、提 Issue,欢迎联系我们
版权声明: 本文为 InfoQ 作者【MASA技术团队】的原创文章。
原文链接:【http://xie.infoq.cn/article/ddf35ed4ee46046185d8b7ebf】。文章转载请联系作者。
评论