写点什么

SpringCloudRPC 调用核心原理:RxJava 响应式编程框架,观察者模式

用户头像
极客good
关注
发布于: 刚刚

此模式的角色中有一个可观察的主题对象 Subject,有多个观察者 Observer 去关注它。当 Subject 的状态发生变化时,会自动通知这些 Observer 订阅者,令 Observer 做出响应。


在整个观察者模式中一共有 4 个角色:Subject(抽象主题、抽象被观察者)、Concrete Subject(具体主题、具体被观察者)、Observer(抽象观察者)以及 ConcreteObserver(具体观察者)。


观察者模式的 4 个角色以及它们之间的关系如图 4-1 所示。



图 4-1 观察者模式的 4 个角色以及它们之间的关系


观察者模式中 4 个角色的介绍如下:


(1)Subject(抽象主题):Subject 抽象主题的主要职责之一为维护 Observer 观察者对象的集合,集合里的所有观察者都订阅过该主题。Subject 抽象主题负责提供一些接口,可以增加、删除和更新观察者对象。


(2)ConcreteSubject(具体主题):ConcreteSubject 用于保持主题的状态,并且在主题的状态发生变化时给所有注册过的观察者发出通知。具体来说,ConcreteSubject 需要调用 Subject(抽象主题)基类的通知方法给所有注册过的观察者发出通知。


(3)Observer(抽象观察者):观察者的抽象


【一线大厂Java面试题解析+核心总结学习笔记+最新架构讲解视频+实战项目源码讲义】
浏览器打开:qq.cn.hn/FTf 免费领取
复制代码


类定义更新接口,使得被观察者可以在收到主题通知的时候更新自己的状态。


(4)ConcreteObserver(具体观察者):实现抽象观察者 Observer 所定义的更新接口,以便在收到主题的通知时完成自己状态的真正更新。


观察者模式的经典实现


==========


首先来看 Subject 主题类的代码实现:它将所有订阅过自己的 Observer 观察者对象保存在一个集合中,然后提供一组方法完成 Observer 观察者的新增、删除和通知。


Subject 主题类的参考代码实现如下:


package com.crazymaker.demo.observerPattern;


import lombok.extern.slf4j.Slf4j;


import java.util.ArrayList;


import java.util.List;


@Slf4j


public class Subject {


//保存订阅过自己的观察者对象


private List<Observer> observers = new ArrayList<>();


//观察者对象订阅


public void add(Observer observer) {


observers.add(observer);


log.info( "add an observer");


}


//观察者对象注销


public void remove(Observer observer) {


observers.remove(observer);


log.info( "remove an observer");


}


//通知所有注册的观察者对象


public void notifyObservers(String newState) {


for (Observer observer : observers) {


observer.update(newState);


}


}


}


接着来看 ConcreteSubject 具体主题类:它首先拥有一个成员用于保持主题的状态,并且在主题的状态变化时调用基类 Subject(抽象主题)的通知方法给所有注册过的观察者发出通知。


package com.crazymaker.demo.observerPattern;


import lombok.extern.slf4j.Slf4j;


@Data


@Slf4jpublic class ConcreteSubject extends Subject {


private String state; //保持主题的状态


public void change(String newState) {


state = newState;


log.info( "change state :" + newState);


//状态发生改变,通知观察者


notifyObservers(newState);


}


}


然后来看一下观察者 Observer 接口,它抽象出了一个观察者自身的状态更新方法。


package com.crazymaker.demo.observerPattern;


public interface Observer {


void update(String newState); //状态更新的方法


}


接着来看 ConcreteObserver 具体观察者类:它首先接收主题的通知,实现抽象观察者 Observer 所定义的 update 接口,以便在收到主题的状态发生变化时完成自己的状态更新。


package com.crazymaker.demo.observerPattern;


import lombok.extern.slf4j.Slf4j;


@Slf4j


public class ObserverA implements Observer {


//观察者状态


private String observerState;


@Override


public void update(String newState) {


//更新观察者状态,让它与主题的状态一致


observerState = newState;


log.info( "观察者的当前状态为:"+observerState);


}


}


4 个角色的实现代码已经介绍完了。如何使用观察者模式呢?步骤如下:


package com.crazymaker.demo.observerPattern;


public class ObserverPatternDemo {


public static void main(String[] args) {


//第一步:创建主题


ConcreteSubject mConcreteSubject = new ConcreteSubject();


//第二步:创建观察者


Observer observerA = new ObserverA();


Observer ObserverB = new ObserverA();


//第三步:主题订阅


mConcreteSubject.add(observerA);


mConcreteSubject.add(ObserverB);


//第四步:主题状态变更


mConcreteSubject.change("倒计时结束,开始秒杀");


}


}


运行示例程序,结果如下:


22:46:03.548 [main] INFO c.c.d.o.ConcreteSubject - change state:倒计时结束,开始秒杀


22:46:03.548 [main] INFO c.c.d.o.ObserverA -观察者的当前状态为:倒计时结束,开始秒杀


22:46:03.548 [main] INFO c.c.d.o.ObserverA - 观察者的当前状态为:倒计时结束,开始秒杀


RxJava 中的观察者模式


=============


RxJava 是基于观察者模式设计的。RxJava 中的 Observable 类和 Subscriber 类分别对应观察者模式中的 Subject(抽象主题)和 Observer(抽象观察者)两个角色。


在 RxJava 中,Observable 和 Subscriber 通过 subscribe()方法实现订阅关系,如图 4-2 所示。



图 4-2 RxJava 通过 subscribe()方法实现订阅关系


在 RxJava 中,Observable 和 Subscriber 之间通过 emitter.onNext(...)弹射的方式实现主题的消息发布,如图 4-3 所示。



图 4-3 RxJava 通过 emitter.onNext()弹射主题消息


RxJava 中主题的消息发布方式之一是通过内部的弹射器 Emitter 完成。Emitter 除了使用 onNext()方法弹射消息之外,还定义了两个特殊的通知方法:onCompleted()和 onError()。


(1)onCompleted():表示消息序列弹射完结。


RxJava 主题(可观察者)中的 Emitter 可以不只发布(弹射)一个消息,可以重复使用其 onNext()方法弹射一系列消息(或事件),这一系列消息组成一个序列。在绝大部分场景下,Observable 内部有一个专门的 Queue(队列)来负责缓存消息序列。当 Emitter 明确不会再有新的消息弹射出来时,需要触发 onCompleted()方法,作为消息序列的结束标志。


RxJava 主题(可观察者)的 Emitter 弹射器所弹出的消息序列也可以称为消息流。


(2)onError():表示主题的消息序列异常终止。


如果 Observable 在事件处理过程中出现异常,Emitter 的 onError()就会被触发,同时消息序列自动终止,不允许再有消息弹射出来。


RxJava 的一个简单使用示例代码如下:


package com.crazymaker.demo.observerPattern;


//省略 import


@Slf4j


public class RxJavaObserverDemo {


/**


*演示 RxJava 中的 Observer 模式


*/


@Test


public void rxJavaBaseUse() {


//被观察者(主题)


Observable observable = Observable.create(


new Action1<Emitter<String>>() {


@Override


public void call(Emitter<String> emitter) {


emitter.onNext("apple");


emitter.onNext("banana");


emitter.onNext("pear"); emitter.onCompleted();


}


},Emitter.BackpressureMode.NONE);


//订阅者(观察者)


Subscriber<String> subscriber = new Subscriber<String>() {


@Override


public void onNext(String s) {


log.info("onNext: {}", s);


}


@Override


public void onCompleted() {


log.info("onCompleted");


}


@Override


public void onError(Throwable e) {


log.info("onError");


}


};


//订阅:Observable 与 Subscriber 之间依然通过 subscribe()进行关联


observable.subscribe(subscriber);


}


}


运行这个示例程序,结果如下:


11:29:07.555 [main] INFO c.c.d.o.RxJavaObserverDemo - onNext: apple


11:29:07.564 [main] INFO c.c.d.o.RxJavaObserverDemo - onNext: banana


11:29:07.564 [main] INFO c.c.d.o.RxJavaObserverDemo - onNext: pear


11:29:07.564 [main] INFO c.c.d.o.RxJavaObserverDemo - onCompleted


通过代码和运行接口可以看出:被观察者 Observable 与观察者 Subscriber 产生关联通过 subscribe()方法完成。当订阅开始时,Observable 主题便开始发送事件。


通过代码还可以看出:Subscriber 有 3 个回调方法,其中 onNext(String s)回调方法用于响应 Observable 主题正常的弹射消息,onCompleted()回调方法用于响应 Observable 主题的结束消息,onError(Throwable e)回调方法用于响应 Observable 主题的异常消息。在一个消息序列中,Emitter 弹射器的 onCompleted()正常结束和 onError()异常终止只能调用一个,并且必须是消息序列中最后一个被发送的消息。换句话说,Emitter 的 onCompleted()和 onError()两个方法是互斥的,在消息序列中调用了其中一个,就不可以再调用另一个。


通过示例可以看出,RxJava 与经典的观察者模式不同。在 RxJava 中,主题内部有一个弹射器的角色,而经典的观察者模式中,主题所发送的是单个消息,并不是一个消息序列。


在 RxJava 中,Observable 主题还会负责消息序列缓存,这一点像经典的生产者/消费者模式。在经典的生产者/消费者模式中,生产者生产数据后放入缓存队列,自己不进行处理,而消费者从缓存队列里拿到所要处理的数据,完成逻辑处理。从这一点来说,RxJava 借鉴了生产者消费者模式的思想。


RxJava 的不完整回调


=============


Java 8 引入函数式编程方式大大地提高了编码效率。但是,Java8 的函数式编程有一个非常重要的要求:需要函数式接口作为支撑。什么是函数式接口呢?指的是有且只有一个抽象方法的接口,比如 Java 中内置的 Runnable 接口。


RxJava 的一大特色是支持函数式的编程。由于标准的 Subscriber 观察者接口有 3 个抽象方法,当然就不是一个函数式接口,因此直接使用 Subscriber 观察者接口是不支持函数式编程的。


RxJava 为了支持函数式编程,另外定义了几个函数式接口,比较重要的有 Action0 和 Action1。


1.Action0 回调接口


这是一个无参数、无返回值的函数式接口,源码如下:


package rx.functions;


/**


*A zero-argument action.


*/


public interface Action0 extends Action {


void call();


}


Action0 接口的 call()方法无参数、无返回值,它的具体使用场景对应于 Subscriber 观察者中的 onCompleted()回调方法的使用场景,因为 Subscriber 的 onCompleted()回调方法也是无参数、无返回值的。


2.Action1 回调接口


这是一个有一个参数、泛型、无返回值的函数式接口,源码如下:


package rx.functions;


/**


*A one-argument action.


*@param <T> the first argument type


*/


public interface Action1<T> extends Action {


void call(T t);


}


Action1 回调接口主要有以下两种用途:


(1)作为函数式编程替代使用 Subscriber 的 onNext()方法的传统编程,前提是 Action1 回调接口的泛型类型与 Subscriber 的 onNext()回调方法的参数类型保持一致。

用户头像

极客good

关注

还未添加个人签名 2021.03.18 加入

还未添加个人简介

评论

发布
暂无评论
SpringCloudRPC调用核心原理:RxJava响应式编程框架,观察者模式