观察者模式
前言
观察者模式是一种对象行为型模式,其主要优点如下:
它的主要缺点如下:
生活小案列
这就是一个观察者模式的生活案例。当领导有事的时候发布通知到群里,群里的所有人收到通知后做相应的事情。以上案例中可以分为下面几个角色:
监听者(也可以说是观察者):群里面每一个人都是一个监听者。
管理者(对应其他教程中的主题-subject):也就是群,主要有添加(群成员)监听者,移除(群成员)监听者,还有通知所有监听者的功能。
事件(或者说通知):也就是领导发到群里面的消息是一个事件(或者说通知)。
通过上面的例子来整理一下实现一个观察者模式的思路。看看一个流程:领导创建群将相关人员添加到群,然后向群里面发布一个通知。群里面每个人看到这条消息后,做相应的事情,当这个消息与自己无关时,啥也不做。领导在这里面可以看做是应用程序的一个线程,只是程序执行的一个单元而已。接下来就是一般设计模式都有的套路,为了程序的扩展性。上面的几个角色都需要定义成抽象的概念,那么在 Java 里面定义抽象有两种一个是接口一个是抽象类。具体定义成接口还是抽象类根据实际情况自行选择。
抽象概念
为什么要定义成抽象的呢?我们先了解一下抽象的概念,我理解抽象就是对一类事物公共部分的定义。比如说水果,就是对一类事物的抽象定义,说到水果,大家肯定能联想到,多汁且主要味觉为甜味和酸味,可食用的植物果实,有丰富的营养成分。这个就是水果的公共成分,但是水果又分为多种,火龙果,百香果···。抽象的好处:比如今天你家里只有一种水果-火龙果。你爹叫你拿一点水果来吃,那你肯定就能直接把家里唯一的水果火龙果拿过来孝敬你老爹。在这个过程中你爹说的水果而不是火龙果,能够少说一个字从而节约能量多活一纳秒。那么我们可以得出一个结论-使用抽象概念可以延年益寿→_→。开个玩笑,下面言归正传,我说一下我认为抽象的好处:
当接口只定义一个实现类时,方便功能的替换(换一个实现类,在新实现类新增功能。从而避免了对调用方和原实现类原代码的改动)。
方法形参定义为抽象,这时就能实现传入不同的实现类该方法可以实现不同的功能。
统一管理,让程序更规范化,当抽象中定义新的非抽象方法,子类可以直接继承使用。
有了上面的铺垫,很容易理解下面的代码示例。
观察者模式代码示例
观察者模式其实也是发布订阅模式。针对不同的观察者需要有不同的实现方式,所以先创建一个管理者的接口,将其定义为一个抽象概念,方便后续扩展。这个接口相当于-群(管理者)
/**
* 观察者的顶层接口
* @param <T>
*/
public interface ObserverInterface<T> {
//注册监听者
public void registerListener(T t);
//移除监听者
public void removeListener(T t);
//通知监听者
public void notifyListener(DataEvent t);
}
复制代码
定义抽象的监听者接口这个接口相当于-群成员(监听者)
/**
* Listener的顶级接口,为了抽象Listener而存在
*/
public interface MyListener {
void onEvent(DataEvent event);
}
复制代码
定义抽象的事件接口这个接口相当于群里面发布的通知
@Data
public abstract class DataEvent {
private String msg;
}
复制代码
创建管理者的实现类,相当于具体的群(如微信群,钉钉群)
/**
* 循环调用方式的观察者(同步)
*/
@Component
public class LoopObserverImpl implements ObserverInterface<MyListener> {
//监听者的注册列表
private List<MyListener> listenerList = new ArrayList<>();
@Override
public void registerListener(MyListener listener) {
listenerList.add(listener);
}
@Override
public void removeListener(MyListener listener) {
listenerList.remove(listener);
}
@Override
public void notifyListener(DataEvent event) {
for (MyListener myListener : listenerList) {
myListener.onEvent(event);
}
}
}
复制代码
创建两个 event 的实现类,一个是积分事件,一个是短信事件
/**
* 积分事件类
*/
public class ScoreDataEvent extends DataEvent {
private Integer score;
}
/**
* 短信事件类
*/
public class SmsDataEvent extends DataEvent {
private String phoneNum;
}
复制代码
创建两个 listener 的实现类,一个是处理积分的,一个是处理短信的
/**
* MyListener的实现类,分数监听者
*/
@Component
public class MyScoreListener implements MyListener {
@Override
public void onEvent(DataEvent dataEvent) {
if (dataEvent instanceof ScoreDataEvent) {
//...省略业务逻辑
System.out.println("积分处理:" + dataEvent.getMsg());
}
}
}
/**
* MyListener的实现类,短信监听者
*/
@Component
public class MySmsListener implements MyListener {
@Override
public void onEvent(DataEvent dataEvent) {
if (dataEvent instanceof SmsDataEvent) {
//...省略短信处理逻辑
System.out.println("短信处理");
}
}
}
复制代码
观察者模式的要素就到齐了,我们在 main 方法里面跑一下
public class Operator {
public static void main(String[] args) {
//通过spring的AnnotationConfigApplicationContext将com.example.demo.user.admin.design路径下的所有加了spring注解的类都扫描放入spring容器
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext("com.example.demo.user.admin.design");
//从spring容器中获取对应bean的实例
LoopObserverImpl loopObserver = context.getBean(LoopObserverImpl.class);
MyScoreListener scoreL = context.getBean(MyScoreListener.class);
MySmsListener smsL = context.getBean(MySmsListener.class);
//向观察者中注册listener
loopObserver.registerListener(scoreL);
loopObserver.registerListener(smsL);
ScoreDataEvent scoreData = new ScoreDataEvent();
scoreData.setMsg("循环同步观察者");
//发布积分事件,通知监听者
loopObserver.notifyListener(scoreData);
/*******************************************/
//从spring容器获取QueueObserverImpl观察者
QueueObserverImpl queueObserver = context.getBean(QueueObserverImpl.class);
//向观察者中注册listener
queueObserver.registerListener(scoreL);
queueObserver.registerListener(smsL);
ScoreDataEvent scoreData1 = new ScoreDataEvent();
scoreData1.setMsg("队列异步观察者");
//发布积分事件,通知监听者
queueObserver.notifyListener(scoreData1);
}
}
复制代码
接下来看看下面这个新的观察者实现类和上面示例中的的观察者实现类LoopObserverImpl
有什么不同吗
/**
* 启动一个线程循环阻塞队列的观察者,可以实现解耦异步。
*/
@Component
public class QueueObserverImpl implements ObserverInterface<MyListener> {
//监听者的注册列表
private List<MyListener> listenerList = new ArrayList<>();
//创建一个大小为10的阻塞队列
private BlockingQueue<DataEvent> queue = new LinkedBlockingQueue<>(10);
//创建一个线程池
private ExecutorService executorService = new ScheduledThreadPoolExecutor(1, r -> {
Thread t = new Thread(r);
t.setName("com.kangarooking.observer.worker");
t.setDaemon(false);
return t;
});
// private ExecutorService executorService = Executors.newFixedThreadPool(1);
@Override
public void registerListener(MyListener listener) {
listenerList.add(listener);
}
@Override
public void removeListener(MyListener listener) {
listenerList.remove(listener);
}
@Override
public void notifyListener(DataEvent event) {
System.out.println("向队列放入DataMsg:" + event.getMsg());
queue.offer(event);
}
@PostConstruct
public void initObserver() {
System.out.println("初始化时启动一个线程");
executorService.submit(() -> {
while (true) {
try {
System.out.println("循环从阻塞队列里面获取数据,take是阻塞队列没有数据就会阻塞住");
DataEvent dataMsg = queue.take();
System.out.println("从阻塞队列获取到数据:" + dataMsg.getMsg());
eventNotify(dataMsg);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
private void eventNotify(DataEvent event) {
System.out.println("循环所有的监听者");
for (MyListener myListener : listenerList) {
myListener.onEvent(event);
}
}
}
复制代码
不同之处就是引入了阻塞队列,让通知这个操作变成异步操作,既只需要将 event 时间放入阻塞队列之后就可以直接返回了。不用像LoopObserverImpl
要等到 listener 注册表循环完毕才能返回。这样就实现了通知操作和循环 listener 注册表的解耦和异步。
举例说明异步实现和同步实现的区别:同步:还是团建群的例子,假如领导是保姆型领导,通知下来任务之后可能不太放心,要挨个问,小张你准备什么表演阿,大概多久能准备好鸭。小红你呢→_→。。。异步:假如是甩手掌柜型领导,发布完消息之后他就不管了。上面就是同步和异步的区别,同步就是领导是个保姆,挨个问挨个了解情况之后这个事情才算完。异步就是领导发布完消息就完事儿。
开源框架的实现
同步方式
spring 的发布订阅就是基于同步的观察者模式: 简单来说就是将所有的监听者注册到一个列表里面,然后当发布事件时,通过循环监听者列表,在循环里面调用每个监听者的 onEvent 方法,每个监听者实现的在 onEvent 方法里面判断传入的 event 是否属于当前需要的 event,属于就处理该事件,反之不处理。
spring 的ApplicationEventMulticaster
就是示例讲的观察者顶层接口
ApplicationListener
就是示例代码的监听者顶层接口
在refresh
方法里面调用的registerListeners();
方法就是将所有的监听者实现类注册到观察者的注册表中
ApplicationEventMulticaster
的multicastEvent
方法就是上面讲的通知方法,这里就是循环监听者注册表,调用每个监听者的 onApplicationEvent 方法(这里的invokeListener
方法里面最终会调用到listener.onApplicationEvent(event);
)
随便看一个onApplicationEvent
方法的实现,跟上面的例子是不是很相似
异步方式
nacos 中有很多地方都使用到了观察者模式,如 client 端和 server 端建立连接,发布连接事件,相关监听者做相应的处理,断开连接也是一样。
在 server 端接收到 client 端的注册请求后,会发布一个注册事件的通知
在 nacos-server 启动的时候也是会开启一个线程做死循环,循环的去 queue 里面 take 数据,如果没有的话就会阻塞。所以死循环只有在 queue 里面一直有数据的时候才会一直循环,当 queue 里面没有数据的时候就会阻塞在queue.take();
方法处。
我们看看receiveEvent(event);
方法里面做了什么,这里就体现了框架里面设计的精妙:在上面我们自己的设计中,这里应该是需要循环调用所有的 listener 的onApplicationEvent
方法,但是当注册表中 listener 太多的时候就会出现(有些 event 可能会有多个 listener 需要处理)循环调用太慢的问题,这里使用多线程的处理方式,让这些调用并行处理,大大的提高了框架的事件处理效率。
关于业务使用场景
可以说观察者模式能解决的,消息队列也可以解决,并且可以做的更好。主要根据实际情况取舍。
当公司的服务器资源充足,并且用户量大,相关业务逻辑调用频繁,消息要求高可靠性,以及消息要求发布订阅更灵活,就可以考虑使用消息队列。
当服务器资源不充足,或者调用比较少,或者希望使用轻量的通知机制,对于消息可靠性要求不高,可以考虑在项目代码里面使用观察者模式。当然使用观察者模式比较麻烦的一点就是要自己写一定量的代码,而且功能还不如消息队列的强大,并且不能保证消息的可靠性,当观察者获取消息在自己的处理逻辑里面产生异常时,可能还需要自己先写好发生异常后的降级代码(当然如果对可靠性要求不高的业务场景就不需要)。
为什么框架使用观察者模式而不使用消息队列(个人理解):
消息队列太重;
本身就是开源框架(本省代表原创),不适合再引入另一个很重的消息队列。增加用户的使用和部署成本以及难度,对于自身的推广也不利。
总结
看到这里其实我就想告诉大家,设计模式其实只是一种思维方式,我们学习设计模式只是了解一个基本的编程思维方式,在实际的使用过程中是需要根据实际情况变化的。观察者模式也是如此,只要思想不滑坡,你可以创造出很多种不同实现方式的观察者模式。
评论