写点什么

设计模式之订阅发布模式

  • 2023-05-27
    湖北
  • 本文字数:3873 字

    阅读完需:约 13 分钟

设计模式之订阅发布模式

一、简介

订阅发布模式(Publish-Subscribe Pattern)是一种行之有效的解耦框架与业务逻辑的方式,也是一种常见的观察者设计模式,它被广泛应用于事件驱动架构中。


在这个模式中,发布者(或者说是主题)并不直接发送消息给订阅者,而是通过调度中心(或者叫消息代理)来传递消息。 发布者(或者说是主题)并不知道订阅者的存在,而订阅者也不知道发布者的存在。他们彼此唯一的关系就是在调度中心注册成为订阅者或者发布者。


当一个发布者有新消息时,就将这个消息发布到调度中心。调度中心就会将这个消息通知给所有订阅者。这就实现了发布者和订阅者之间的解耦,发布者和订阅者不再直接依赖于彼此,他们可以独立地扩展自己。


在具体的实现中,可以通过消息队列、事件总线等机制来实现调度中心,不同语言和平台都有实现的库和框架,例如 Java 中的 ActiveMQ、RabbitMQ、Kafka 等。


订阅发布模式有以下优点:


  1. 性能好,发布者发送消息后直接返回不需要等待消费者处理完毕。

  2. 解耦性较强,发布者和订阅者之间不存在直接依赖,满足高内聚低耦合的设计思想。

  3. 可以支持一对多、多对多的消息通信模型,提供了更加灵活的消息传递方式。

  4. 可以动态地增加或删除发布者和订阅者,扩展性较好。

二、Java 实现发布订阅模式

  1. 创建订阅者接口,用于接受消息通知。


interface Subscriber {    void update(String message);}
复制代码


  1. 创建发布者,用于发布消息。实现了增加、删除和发布的功能,并且维护了一个订阅列表,


class Publisher {    private Map<String, List<Subscriber>> subscribers = new HashMap<>();
public void subscribe(String topic, Subscriber subscriber) { List<Subscriber> subscriberList = subscribers.get(topic); if (subscriberList == null) { subscriberList = new ArrayList<>(); subscribers.put(topic, subscriberList); } subscriberList.add(subscriber); }
public void unsubscribe(String topic, Subscriber subscriber) { List<Subscriber> subscriberList = subscribers.get(topic); if (subscriberList != null) { subscriberList.remove(subscriber); } }
public void publish(String topic, String message) { List<Subscriber> subscriberList = subscribers.get(topic); if (subscriberList != null) { for (Subscriber subscriber : subscriberList) { subscriber.update(message); } } }}
复制代码


  1. 我们还实现了两个不同的 Subscriber 实现,一个是 EmailSubscriber,另一个是 SMSSubscriber,用于接受发布者的消息并将其分别发送到邮箱和手机上。


class EmailSubscriber implements Subscriber {    private String email;
public EmailSubscriber(String email) { this.email = email; }
public void update(String message) { System.out.println("Send email to " + email + ": " + message); }}
class SMSSubscriber implements Subscriber { private String phoneNumber;
public SMSSubscriber(String phoneNumber) { this.phoneNumber = phoneNumber; }
public void update(String message) { System.out.println("Send SMS to " + phoneNumber + ": " + message); }}
复制代码


  1. 在 Main 类中,我们创建了一个 Publisher 对象,并添加了两个 EmailSubscriber 和两个 SMSSubscriber,分别订阅了 news 主题的更新。我们先给这个主题发送一条消息,然后取消 news 主题的其中一个订阅者,最后我们再次给 news 主题发送一条消息。


public class Main {    public static void main(String[] args) {        Publisher publisher = new Publisher();
Subscriber emailSubscriber1 = new EmailSubscriber("foo@example.com"); Subscriber smsSubscriber1 = new SMSSubscriber("1234567890");
publisher.subscribe("news", emailSubscriber1); publisher.subscribe("news", smsSubscriber1);
publisher.publish("news", "发布新消息1"); publisher.unsubscribe("news", smsSubscriber1); publisher.publish("news", "发布新消息2"); }}
复制代码


打印输出如下:


Send email to foo@example.com: 发布新消息1Send SMS to 1234567890: 发布新消息1Send email to foo@example.com: 发布新消息2
复制代码

三、Spring 中自带的订阅发布模式

Spring 的订阅发布模式是通过发布事件、事件监听器和事件发布器 3 个部分来完成的


这里我们通过 newbee-mall-pro 项目中已经实现订阅发布模式的下单流程给大家讲解,项目地址:https://github.com/wayn111/newbee-mall-pro


  1. 自定义订单发布事件,继承 ApplicationEvent


public class OrderEvent extends ApplicationEvent {  void onApplicationEvent(Object event) {    ...  }}
复制代码


  1. 定义订单监听器,实现 ApplicationListener


@Componentpublic class OrderListener implements ApplicationListener<OrderEvent> {    @Override    public void onApplicationEvent(OrderEvent event) {    // 生成订单、删除购物车、扣减库存    ...    }}
复制代码


  1. 下单流程,通过事件发布器 applicationEventPublisher 发布订单事件,然后再订单监听器中处理订单保存逻辑。


@Resourceprivate ApplicationEventPublisher applicationEventPublisher;
private void saveOrder(MallUserVO mallUserVO, Long couponUserId, List<ShopCatVO> shopcatVOList, String orderNo) { // 订单检查 ... // 生成订单号 String orderNo = NumberUtil.genOrderNo(); // 发布订单事件,在事件监听中处理下单逻辑 applicationEventPublisher.publishEvent(new OrderEvent(orderNo, mallUserVO, couponUserId, shopcatVOList)); // 所有操作成功后,将订单号返回 return orderNo; ...}
复制代码


通过事件监听机制,我们将下单逻辑拆分成如下步骤:


  1. 订单检查

  2. 生成订单号

  3. 发布订单事件,在事件监听中处理订单保存逻辑

  4. 所有操作成功后,将订单号返回每个步骤都是各自独立不互相影响


如上的代码已经实现了订阅发布模式,成功解耦了下单逻辑。但是在性能上还没有得到优化,因为 Spring Boot 项目中,默认情况下事件监听器是同步处理的,也就是说这里下单流程会等待事件监听器处理完毕才返回,最终影响接口响应时长。

四、使用异步的事件监听发布类

Spring Boot 项目中事件监听发布类是由 SimpleApplicationEventMulticaster 这个类实现的,源码中通知订阅者代码如下:



可以看到,代码里是有判断 getTaskExecutor() 方法返回不为空的话,就交由 executor 执行,负责同步执行。这个时候大家就要问了,这里不是有线程池在异步通知订阅者吗?


不急,博主带大家继续查看源码。



可以看到 getTaskExecutor() 方法返回一个成员属性,这个成员属性在 SimpleApplicationEventMulticaster 类中是通过 setTaskExecutor(@Nullable Executor taskExecutor) 方法设置的。我们通过 ctrl + f7 查一下 setTaskExecutor(...) 方法在哪里被调用过,



Ok,到此水落石出,SimpleApplicationEventMulticaster 类的 taskExecutor 成员属性一直为 null,所以在通过订阅者的时候一直是同步处理,等待订阅者处理完毕。




对于异步处理,我们可以从 2 个方面入手:


  1. 事件监听器入手,将事件监听器的事件触发方法改为异步执行,例如将生成订单、删除购物车、扣减库存逻辑放入线程池异步执行,或者是在订阅者的通知方法 onApplicationEvent 上加上@Async注解,表示该方法异步执行。

  2. 事件监听发布类入手,设置默认事件监听发布类的taskExecutor属性,通过源码可知,也可以解决。


这里博主给大家介绍下怎么修改事件监听发布类来解决。


/** * 系统启动时執行 */@Componentpublic class SpringBeanStartupRunner implements ApplicationRunner {
@Override public void run(ApplicationArguments args) throws Exception { // 设置spring默认的事件监听为异步执行 SimpleApplicationEventMulticaster multicaster = SpringContextUtil.getBean(SimpleApplicationEventMulticaster.class); ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 5, 10, 60L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(500), new CustomizableThreadFactory("newbee—event-task"), new ThreadPoolExecutor.CallerRunsPolicy() ); multicaster.setTaskExecutor(threadPoolExecutor); }}
复制代码


在系统启动时反射修改SimpleApplicationEventMulticaster类的taskExecutor属性,从而让SimpleApplicationEventMulticaster类支持异步事件通知。

总结

建议大家在日常开发中多加思考哪些业务流程可以适用,例如微服务项目中订单支付成功后需要通知用户、商品、活动等多个服务时,可以考虑使用订阅发布模式。解耦发布者和订阅者,发布者只管发布消息,不需要知道有哪些订阅者,也不需要知道订阅者的具体实现。订阅者只需要关注自己感兴趣的消息即可。这种松耦合的设计使得系统更容易扩展和维护。


关注公众号【waynblog】每周分享技术干货、开源项目、实战经验、高效开发工具等,您的关注将是我的更新动力!

发布于: 2023-05-27阅读数: 19
用户头像

waynaqua 2020-03-10 加入

java开发工程师

评论

发布
暂无评论
设计模式之订阅发布模式_设计模式_越长大越悲伤_InfoQ写作社区