写点什么

将 Abp 默认事件总线改造为分布式事件总线

作者:EquatorCoco
  • 2023-12-21
    福建
  • 本文字数:5150 字

    阅读完需:约 17 分钟

原理


本地事件总线是通过 Ioc 容器来实现的。


IEventBus 接口定义了事件总线的基本功能,如注册事件、取消注册事件、触发事件等。


Abp.Events.Bus.EventBus 是本地事件总线的实现类,其中私有成员ConcurrentDictionary<Type, List<IEventHandlerFactory>> _handlerFactories是事件订阅表。通过维护事件订阅表来实现事件处理器的注册和取消注册。当对应类型的事件触发时,通过订阅表查找所有事件处理器,通过 Ioc 容器来获取处理器实例,然后通过反射来调用事件处理器的"HandleEvent"方法。


创建分布式事件总线


首先,我们需要一个分布式事件总线中间件,用来将事件从本地事件总线转发到分布式事件总线。常用的中间件有 RabbitMQ、Kafka、Redis 等。


开源社区已经有实现好的库,本项目参考了 wuyi6216/Abp.RemoteEventBus


这里已经定义好了一个分布式事件总线接口



public interface IDistributedEventBus : IDisposable{ void MessageHandle(string topic, string message);
void Publish(IDistributedEventData eventData);
void Subscribe(string topic);
void Unsubscribe(string topic);
void UnsubscribeAll();}
复制代码


为了兼容本地事件总线,我们需要定义一个分布式事件总线接口,继承自 IEventBus 接口。



public interface IMultipleEventBus : IDistributedEventBus, IEventBus{
}
复制代码


实现自动订阅和事件转发


当注册本地事件时,将订阅分布式事件,事件 Topic 为类型的字符串表现形式


public IDisposable Register(Type eventType, IEventHandlerFactory factory){    GetOrCreateHandlerFactories(eventType);    List<IEventHandlerFactory> currentLists;    if (_handlerFactories.TryGetValue(eventType, out currentLists))    {        lock (currentLists)        {            if (currentLists.Count == 0)            {                //Register to distributed event                this.Subscribe(eventType.ToString());            }            currentLists.Add(factory);        }    }    return new FactoryUnregistrar(this, eventType, factory);}
复制代码


创建 TriggerRemote,此方法用于将本地事件参数打包成为分布式事件消息 payload,并发布该消息


public void TriggerRemote(Type eventType, object eventSource, IEventData eventData){    var exceptions = new List<Exception>();    eventData.EventSource = eventSource;    try    {        var payloadDictionary = new Dictionary<string, object>                {                    { PayloadKey, eventData }                };        var distributedeventData = new DistributedEventData(eventType.ToString(), payloadDictionary);        Publish(distributedeventData);    }
catch (Exception ex) { exceptions.Add(ex); } if (exceptions.Any()) { if (exceptions.Count == 1) { exceptions[0].ReThrow(); }
throw new AggregateException("More than one error has occurred while triggering the event: " + eventType, exceptions); }}
复制代码


当触发本地事件时,将消息转发至分布式事件总线。


在 Trigger 方法中调用 TriggerRemote,事件状态回调和事件异常回调将不会被转发。


if (!(typeof(DistributedEventBusEvent) == eventType   || typeof(DistributedEventBusEvent).IsAssignableFrom(eventType)   || typeof(DistributedEventMessageHandleExceptionData) == eventType   || typeof(DistributedEventHandleExceptionData) == eventType    )){    if (typeof(DistributedEventArgs) != eventType)    {        TriggerRemote(eventType, eventSource, eventData);
}}
复制代码


在消费端接收到分布式事件消息时,从 Topic 中解析类型,转发给本地事件。若此类型在本地事件注册过,则将消息反序列化为本地事件参数,然后触发本地事件。


本地事件处理器将触发最终的处理方法。



public virtual void MessageHandle(string topic, string message){ Logger.Debug($"Receive message on topic {topic}"); try { var eventData = _remoteEventSerializer.Deserialize<DistributedEventData>(message); var eventArgs = new DistributedEventArgs(eventData, topic, message); Trigger(this, new DistributedEventBusHandlingEvent(eventArgs));
if (!string.IsNullOrEmpty(eventData.Type)) { string pattern = @"(.*?)\[(.*?)\]"; Match match = Regex.Match(eventData.Type, pattern); if (match.Success) {
var type = match.Groups[1].Value; var type2 = match.Groups[2].Value;
var localTriggerType = typeFinder.Find(c => c.FullName == type).FirstOrDefault(); var genericType = typeFinder.Find(c => c.FullName == type2).FirstOrDefault();
if (localTriggerType != null && genericType != null) {
if (localTriggerType.GetTypeInfo().IsGenericType && localTriggerType.GetGenericArguments().Length == 1 && !genericType.IsAbstract && !genericType.IsInterface ) { var localTriggerGenericType = localTriggerType.GetGenericTypeDefinition().MakeGenericType(genericType);

if (eventData.Data.TryGetValue(PayloadKey, out var payload)) { var payloadObject = (payload as JObject).ToObject(localTriggerGenericType); Trigger(localTriggerGenericType, this, (IEventData)payloadObject);
} } }

} else { var localTriggerType = typeFinder.Find(c => c.FullName == eventData.Type).FirstOrDefault(); if (localTriggerType != null && !localTriggerType.IsAbstract && !localTriggerType.IsInterface) { if (eventData.Data.TryGetValue(PayloadKey, out var payload)) { var payloadObject = (payload as JObject).ToObject(localTriggerType); Trigger(localTriggerType, this, (IEventData)payloadObject);
}
} } Trigger(this, new DistributedEventBusHandledEvent(eventArgs));
} } catch (Exception ex) { Logger.Error("Consume remote message exception", ex); Trigger(this, new DistributedEventMessageHandleExceptionData(ex, topic, topic)); }}
复制代码


使用


DistributedEventBus 有不同的实现方式,这里以 Redis 为例


启动 Redis 服务


下载 Redis 并启动服务,使用默认端口 6379


配置


生产者和消费者端都需要配置分布式事件总线


首先引用 Abp.DistributedEventBus.Redis,并配置 Abp 模块依赖


[DependsOn(typeof(AbpDistributedEventBusRedisModule))]


在 PreInitialize 方法中配置 Redis 连接信息


 Configuration.Modules.DistributedEventBus().UseRedis().Configure(setting => {     setting.Server = "127.0.0.1:6379"; });
复制代码


用 MultipleEventBus 替换 Abp 默认事件总线


 //todo: 事件总线 Configuration.ReplaceService(  typeof(IEventBus),  () => IocManager.IocContainer.Register(      Component.For<IEventBus>().ImplementedBy<MultipleEventBus>()  ));
复制代码


传递 Abp 默认事件


我们知道在使用仓储时,Abp 会自动触发一些事件,如创建、更新、删除等。我们来测试这些事件是否能通过分布式事件总线来传递。


定义一个实体类,用于传递实体的增删改事件。



public class Person : FullAuditedEntity<int>{
public string Name { get; set; } public int Age { get; set; } public string PhoneNumber { get; set; }
}
复制代码


在消费者端,定义一个事件处理器,用于处理实体的增删改事件。



public class RemoteEntityChangedEventHandler : IEventHandler<EntityUpdatedEventData<Person>>, IEventHandler<EntityCreatedEventData<Person>>, IEventHandler<EntityDeletedEventData<Person>>, ITransientDependency{
void IEventHandler<EntityUpdatedEventData<Person>>.HandleEvent(EntityUpdatedEventData<Person> eventData) { var person = eventData.Entity; Console.WriteLine($"Remote Entity Updated - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}"); }
void IEventHandler<EntityCreatedEventData<Person>>.HandleEvent(EntityCreatedEventData<Person> eventData) { var person = eventData.Entity; Console.WriteLine($"Remote Entity Created - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");
}
void IEventHandler<EntityDeletedEventData<Person>>.HandleEvent(EntityDeletedEventData<Person> eventData) { var person = eventData.Entity; Console.WriteLine($"Remote Entity Deleted - Name:{person.Name}, Age:{person.Age}, PhoneNumber:{person.PhoneNumber}");
}}
复制代码


在生产者端,用 IRepository 对实体进行增删改操作。



var person = new Person(){
Name = "John", Age = 36, PhoneNumber = "18588888888"
};
personRepository.Insert(person);
var person2 = new Person(){
Name = "John2", Age = 36, PhoneNumber = "18588888889"
};personRepository.Insert(person2);
var persons = personRepository.GetAllList();foreach (var p in persons){ p.Age += 1; personRepository.Update(p); Console.WriteLine($"Entity Updated - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber}");
}foreach (var p in persons){ personRepository.Delete(p); Console.WriteLine($"Entity Deleted - Name:{p.Name}, Age:{p.Age}, PhoneNumber:{p.PhoneNumber}");
}
复制代码


运行程序(同时运行消费者端和生产者端),可以看到消费者端打印出了实体的增删改事件。



注意:


分布式事件总线在两个独立系统间传递事件,所以需要定义一个共同的类型对象,用于事件参数的传递。因此消费者端需要引用生产者端的模块,以便获取共同的类型对象。


传递自定义事件


定义 NotificationEventData,用于传递自定义事件。



public class NotificationEventData : EventData{ public int Id { get; set; } public string Title { get; set; }
public string Message { get; set; }
public bool IsRead { get; set; }}
复制代码


在消费者端,定义一个事件处理器,用于处理自定义事件。


public class NotificationEventHandler :    IEventHandler<NotificationEventData>,          ITransientDependency{        void IEventHandler<NotificationEventData>.HandleEvent(NotificationEventData eventData)    {        Console.WriteLine($"Id: {eventData.Id}");        Console.WriteLine($"Title: {eventData.Title}");        Console.WriteLine($"Message: {eventData.Message}");        Console.WriteLine($"IsRead: {eventData.IsRead}");
}}
复制代码


在生产者端,触发自定义事件。


var eventBus = IocManager.Instance.Resolve<IEventBus>();

eventBus.Trigger<NotificationEventData>(new NotificationEventData(){ Title = "Hi", Message = "Customized definition event test!", Id = 100, IsRead = true,});
复制代码


运行程序(同时运行消费者端和生产者端),可以看到消费者端打印出了自定义事件。



文章转载自:林晓lx

原文链接:https://www.cnblogs.com/jevonsflash/p/17917031.html

体验地址:http://www.jnpfsoft.com/?from=001

用户头像

EquatorCoco

关注

还未添加个人签名 2023-06-19 加入

还未添加个人简介

评论

发布
暂无评论
将Abp默认事件总线改造为分布式事件总线_分布式_EquatorCoco_InfoQ写作社区