写点什么

MASA Framework 事件总线 - 进程内事件总线

  • 2022-11-24
    浙江
  • 本文字数:4944 字

    阅读完需:约 16 分钟

概述

事件总线是一种事件发布/订阅结构,通过发布订阅模式可以解耦不同架构层级,同样它也可以来解决业务之间的耦合,它有以下优点


  • 松耦合

  • 横切关注点

  • 可测试性

  • 事件驱动


发布订阅模式

通过下图我们可以快速了解发布订阅模式的本质


  1. 订阅者将自己关心的事件在调度中心进行注册

  2. 事件的发布者通过调度中心把事件发布出去

  3. 订阅者收到自己关心的事件变更并执行相对应业务



其中发布者无需知道订阅者是谁,订阅者彼此之间也互不认识,彼此之间互不干扰

事件总线类型

在 Masa Framework 中,将事件划分为



本地事件,它的发布与订阅需要在同一个进程中,订阅方与发布方需要在同一个项目中



集成事件,它的发布与订阅一定不在同一个进程中,订阅方与发布方可以在同一个项目中,也可以在不同的项目中


下面我们会用一个注册用户的例子来说明如何使用本地事件

入门


  1. 新建 ASP.NET Core 空项目Assignment.InProcessEventBus,并安装Masa.Contrib.Dispatcher.Events


dotnet new web -o Assignment.InProcessEventBuscd Assignment.InProcessEventBusdotnet add package Masa.Contrib.Dispatcher.Events --version 0.7.0-preview.7
复制代码


  1. 注册 EventBus (用于发布本地事件), 修改Program.cs


builder.Services.AddEventBus();
复制代码


  1. 新增RegisterUserEvent类并继承Event,用于发布注册用户事件


public record RegisterEvent : Event{    public string Account { get; set; }
public string Email { get; set; }
public string Password { get; set; }}
复制代码


  1. 新增注册用户处理程序


在指定事件处理程序方法上增加特性 EventHandler,并在方法中增加参数 RegisterUserEvent


public class UserHandler{    private readonly ILogger<UserHandler>? _logger;
public UserHandler(ILogger<UserHandler>? logger = null) { //todo: 根据需要可在构造函数中注入其它服务 (需支持从DI获取) _logger = logger; }
[EventHandler] public void RegisterUser(RegisterUserEvent @event) { //todo: 1. 编写注册用户业务 _logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户"); //todo: 2. 编写发送注册通知等 _logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送邮件提示注册成功"); }}
复制代码


注册用户的处理程序可以放到任意一个类中,但其构造函数参数必须支持从 DI 获取,且处理程序的方法仅支持 TaskVoid 两种, 不支持其它类型


  1. 发送注册用户事件,修改Program.cs


app.MapPost("/register", async (RegisterUserEvent @event, IEventBus eventBus) =>{    await eventBus.PublishAsync(@event);});
复制代码

进阶

处理流程

EventBus 的 请求管道包含一系列请求委托,依次调用。 它们与ASP.NET Core中间件有异曲同工之妙,区别点在于中间件的执行顺序与注册顺序相反,最先注册的最后执行



每个委托均可在下一个委托前后执行操作,其中TransactionMiddleware是 EventBus 发布后第一个要进入的中间件 (默认提供),并且它是不支持多次嵌套的。


EventBus 支持嵌套,这意味着我们可以在 Handler 中重新发布一个新的Event,但TransactionMiddleware仅会在最外层进入时被触发一次

自定义中间件

根据需要我们可以自定义中间件,并注册到 EventBus 的请求管道中,比如通过增加FluentValidation, 将参数验证从业务代码中剥离开来,从而使得处理程序更专注于业务


  1. 注册FluentValidation, 修改Program.cs


builder.Services.AddValidatorsFromAssembly(Assembly.GetEntryAssembly());
复制代码


  1. 自定义验证中间件ValidatorMiddleware.cs,用于验证参数


public class ValidatorMiddleware<TEvent> : Middleware<TEvent>    where TEvent : IEvent{    private readonly ILogger<ValidatorMiddleware<TEvent>>? _logger;    private readonly IEnumerable<IValidator<TEvent>> _validators;
public ValidatorMiddleware(IEnumerable<IValidator<TEvent>> validators, ILogger<ValidatorMiddleware<TEvent>>? logger = null) { _validators = validators; _logger = logger; }
public override async Task HandleAsync(TEvent @event, EventHandlerDelegate next) { var typeName = @event.GetType().FullName;
_logger?.LogDebug("----- Validating command {CommandType}", typeName);
var failures = _validators .Select(v => v.Validate(@event)) .SelectMany(result => result.Errors) .Where(error => error != null) .ToList();
if (failures.Any()) { _logger?.LogError("Validation errors - {CommandType} - Event: {@Command} - Errors: {@ValidationErrors}", typeName, @event, failures);
throw new ValidationException("Validation exception", failures); }
await next(); }}
复制代码


  1. 注册 EventBus 并使用验证中间件ValidatorMiddleware


builder.Services.AddEventBus(eventBusBuilder=>eventBusBuilder.UseMiddleware(typeof(ValidatorMiddleware<>)));
复制代码


  1. 添加注册用户验证类RegisterUserEventValidator.cs


public class RegisterUserEventValidator : AbstractValidator<RegisterUserEvent>{    public RegisterUserEventValidator()    {        RuleFor(e => e.Account).NotNull().WithMessage("用户名不能为空");        RuleFor(e => e.Email).NotNull().WithMessage("邮箱不能为空");        RuleFor(e => e.Password)            .NotNull().WithMessage("密码不能为空")            .MinimumLength(6)            .WithMessage("密码必须大于6位")            .MaximumLength(20)            .WithMessage("密码必须小于20位");    }}
复制代码

编排

EventBus 支持事件编排,它们可以用来处理一些对执行顺序有要求的业务,比如: 注册用户必须成功之后才可以发送注册邮件通知,发送奖励等等,那我们可以这样做


将注册用户业务拆分为三个 Handler,并通过指定 Order 的值来对执行事件排序


public class UserHandler{    private readonly ILogger<UserHandler>? _logger;
public UserHandler(ILogger<UserHandler>? logger = null) { _logger = logger; }
[EventHandler(1)] public void RegisterUser(RegisterUserEvent @event) { _logger?.LogDebug("-----------{Message}-----------", "检测用户是否存在并注册用户"); //todo: 编写注册用户业务 }
[EventHandler(2)] public void SendAwardByRegister(RegisterUserEvent @event) { _logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送注册奖励"); //todo: 编写发送奖励等 }
[EventHandler(3)] public void SendNoticeByRegister(RegisterUserEvent @event) { _logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送注册成功邮件"); //todo: 编写发送注册通知等 }}
复制代码

Saga

EventBus 支持 Saga 模式



具体是怎么做呢?


[EventHandler(1, IsCancel = true)]public void CancelSendAwardByRegister(RegisterUserEvent @event){    _logger?.LogDebug("-----------{Account} 注册成功,发放奖励失败 {Message}-----------", @event.Account, "发放奖励补偿");}
复制代码


当发送奖励出现异常时,则执行补偿机制,执行顺序为 (2 - 1) > 0,由于目前仅存在一个 Order 为 1 的 Handler,则执行奖励补偿后退出


但对于部分不需要执行失败但不需要执行回退的方法,我们可以修改 FailureLevels 确保不会因为当前方法的异常而导致执行补偿机制


[EventHandler(3, FailureLevels = FailureLevels.Ignore)]public void SendNoticeByRegister(RegisterUserEvent @event){    _logger?.LogDebug("-----------{Account} 注册成功 {Message}-----------", @event.Account, "发送邮件提示注册成功");    //todo: 编写发送注册通知等}
复制代码

源码解读

EventHandler

  • FailureLevels: 失败级别, 默认: Throw

  • Throw:发生异常后,依次执行 Order 小于当前 Handler 的 Order 的取消动作,比如:Handler 顺序为 1、2、3,CancelHandler 为 1、2、3,如果执行 Handler3 异常,则依次执行 2、1

  • ThrowAndCancel:发生异常后,依次执行 Order 小于等于当前 Handler 的 Order 的取消动作,比如:Handler 顺序为 1、2、3,CancelHandler 为 1、2、3,如果执行 Handler3 异常,则依次执行 3、2、1

  • Ignore:发生异常后,忽略当前异常(不执行取消动作),继续执行其他 Handler

  • Order: 执行顺序,默认: int.MaxValue,用于控制当前方法的执行顺序

  • EnableRetry: 当 Handler 异常后是否启用重试, 默认: false

  • RetryTimes: 重试次数,当出现异常后执行多少次重试, 需开启重试配置

  • IsCancel: 是否是补偿机制,默认: false

Middleware<TEvent>

  • SupportRecursive: 是否支持递归 (嵌套), 默认: true

  • 部分中间件仅在最外层被触发一次,像TransactionMiddleware 就是如此,但也有很多中间件是需要被多次执行的,比如ValidatorMiddleware,每次发布事件时都需要验证参数是否正确

  • HandleAsync(TEvent @event, EventHandlerDelegate next): 处理程序,通过调用 next() 使得请求进入下一个Handler

IEventHandler<TEvent> 与 ISagaEventHandler<TEvent>

  • HandleAsync(TEvent @event): 提供事件的 Handler

  • CancelAsync(TEvent @event): 提供事件的补偿 Handler


EventHandler功能类似,提供基本的 Handler 以及补偿 Handler,推荐使用EventHandler的方式使用

TransactionMiddleware

提供事务中间件,当EventBusUoW以及 Masa 提供的Repository来使用时,当存在待提交的数据时,会自动执行保存并提交,当出现异常后,会执行事务回滚,无需担心脏数据入库

性能测试

与市面上使用较多的MeidatR作了对比,结果如下图所示:


BenchmarkDotNet=v0.13.1, OS=Windows 10.0.19043.1023 (21H1/May2021Update)11th Gen Intel Core i7-11700 2.50GHz, 1 CPU, 16 logical and 8 physical cores.NET SDK=7.0.100-preview.4.22252.9[Host] : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT DEBUGJob-MHJZJL : .NET 6.0.6 (6.0.622.26707), X64 RyuJIT


Runtime=.NET 6.0 IterationCount=100 RunStrategy=ColdStart



根据性能测试我们发现,EventBus 与 MediatR 性能差距很小,但 EventBus 提供的功能却要强大的多

常见问题

  1. 按照文档操作,通过EventBus发布事件后,对应的 Handler 并没有执行,也没有发现错误?


①. EventBus.PublishAsync(@event) 是异步方法,确保等待方法调用成功,检查是否出现同步方法调用异步方法的情况②. 注册EventBus时指定程序集集合, Assembly 被用于注册时获取并保存事件与 Handler 的对应关系


var assemblies = new[]{    typeof(UserHandler).Assembly};builder.Services.AddEventBus(assemblies);
复制代码


程序集: 手动指定 Assembly 集合 -> MasaApp.GetAssemblies() -> AppDomain.CurrentDomain.GetAssemblies()


但由于 NetCore 按需加载,未使用的程序集在当前域中不存在,因此可能会导致部分事件以及 Handler 的对应关系未正确保存,因此可通过手动指定 Assembly 集合或者修改全局配置中的 Assembly 集合来修复这个问题


  1. 通过 EventBus 发布事件,Handler 出错,但数据依然保存到数据库中


①. 检查是否禁用事务


  1. DisableRollbackOnFailure 是否为 true (是否失败时禁止回滚)

  2. UseTransaction 是否为 false (禁止使用事务)


②. 检查当前数据库是否支持回滚。例如: 使用的是 Mysql 数据库,但回滚数据失败,请查看

本章源码

Assignment11


https://github.com/zhenlei520/MasaFramework.Practice

开源地址

MASA.Framework:https://github.com/masastack/MASA.Framework


如果你对我们的 MASA Framework 感兴趣,无论是代码贡献、使用、提 Issue,欢迎联系我们



发布于: 刚刚阅读数: 4
用户头像

还未添加个人签名 2021-10-26 加入

MASA技术团队官方账号,我们专注于.NET现代应用开发解决方案,Wechat:MasaStackTechOps ,Website:www.masastack.com

评论

发布
暂无评论
MASA Framework 事件总线 - 进程内事件总线_Framework_MASA技术团队_InfoQ写作社区