SpringCloudRPC 调用核心原理:RxJava 响应式编程框架,观察者模式
此模式的角色中有一个可观察的主题对象 Subject,有多个观察者 Observer 去关注它。当 Subject 的状态发生变化时,会自动通知这些 Observer 订阅者,令 Observer 做出响应。
在整个观察者模式中一共有 4 个角色:Subject(抽象主题、抽象被观察者)、Concrete Subject(具体主题、具体被观察者)、Observer(抽象观察者)以及 ConcreteObserver(具体观察者)。
观察者模式的 4 个角色以及它们之间的关系如图 4-1 所示。
图 4-1 观察者模式的 4 个角色以及它们之间的关系
观察者模式中 4 个角色的介绍如下:
(1)Subject(抽象主题):Subject 抽象主题的主要职责之一为维护 Observer 观察者对象的集合,集合里的所有观察者都订阅过该主题。Subject 抽象主题负责提供一些接口,可以增加、删除和更新观察者对象。
(2)ConcreteSubject(具体主题):ConcreteSubject 用于保持主题的状态,并且在主题的状态发生变化时给所有注册过的观察者发出通知。具体来说,ConcreteSubject 需要调用 Subject(抽象主题)基类的通知方法给所有注册过的观察者发出通知。
(3)Observer(抽象观察者):观察者的抽象
类定义更新接口,使得被观察者可以在收到主题通知的时候更新自己的状态。
(4)ConcreteObserver(具体观察者):实现抽象观察者 Observer 所定义的更新接口,以便在收到主题的通知时完成自己状态的真正更新。
观察者模式的经典实现
==========
首先来看 Subject 主题类的代码实现:它将所有订阅过自己的 Observer 观察者对象保存在一个集合中,然后提供一组方法完成 Observer 观察者的新增、删除和通知。
Subject 主题类的参考代码实现如下:
package com.crazymaker.demo.observerPattern;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class Subject {
//保存订阅过自己的观察者对象
private List<Observer> observers = new ArrayList<>();
//观察者对象订阅
public void add(Observer observer) {
observers.add(observer);
log.info( "add an observer");
}
//观察者对象注销
public void remove(Observer observer) {
observers.remove(observer);
log.info( "remove an observer");
}
//通知所有注册的观察者对象
public void notifyObservers(String newState) {
for (Observer observer : observers) {
observer.update(newState);
}
}
}
接着来看 ConcreteSubject 具体主题类:它首先拥有一个成员用于保持主题的状态,并且在主题的状态变化时调用基类 Subject(抽象主题)的通知方法给所有注册过的观察者发出通知。
package com.crazymaker.demo.observerPattern;
import lombok.extern.slf4j.Slf4j;
@Data
@Slf4jpublic class ConcreteSubject extends Subject {
private String state; //保持主题的状态
public void change(String newState) {
state = newState;
log.info( "change state :" + newState);
//状态发生改变,通知观察者
notifyObservers(newState);
}
}
然后来看一下观察者 Observer 接口,它抽象出了一个观察者自身的状态更新方法。
package com.crazymaker.demo.observerPattern;
public interface Observer {
void update(String newState); //状态更新的方法
}
接着来看 ConcreteObserver 具体观察者类:它首先接收主题的通知,实现抽象观察者 Observer 所定义的 update 接口,以便在收到主题的状态发生变化时完成自己的状态更新。
package com.crazymaker.demo.observerPattern;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ObserverA implements Observer {
//观察者状态
private String observerState;
@Override
public void update(String newState) {
//更新观察者状态,让它与主题的状态一致
observerState = newState;
log.info( "观察者的当前状态为:"+observerState);
}
}
4 个角色的实现代码已经介绍完了。如何使用观察者模式呢?步骤如下:
package com.crazymaker.demo.observerPattern;
public class ObserverPatternDemo {
public static void main(String[] args) {
//第一步:创建主题
ConcreteSubject mConcreteSubject = new ConcreteSubject();
//第二步:创建观察者
Observer observerA = new ObserverA();
Observer ObserverB = new ObserverA();
//第三步:主题订阅
mConcreteSubject.add(observerA);
mConcreteSubject.add(ObserverB);
//第四步:主题状态变更
mConcreteSubject.change("倒计时结束,开始秒杀");
}
}
运行示例程序,结果如下:
22:46:03.548 [main] INFO c.c.d.o.ConcreteSubject - change state:倒计时结束,开始秒杀
22:46:03.548 [main] INFO c.c.d.o.ObserverA -观察者的当前状态为:倒计时结束,开始秒杀
22:46:03.548 [main] INFO c.c.d.o.ObserverA - 观察者的当前状态为:倒计时结束,开始秒杀
RxJava 中的观察者模式
=============
RxJava 是基于观察者模式设计的。RxJava 中的 Observable 类和 Subscriber 类分别对应观察者模式中的 Subject(抽象主题)和 Observer(抽象观察者)两个角色。
在 RxJava 中,Observable 和 Subscriber 通过 subscribe()方法实现订阅关系,如图 4-2 所示。
图 4-2 RxJava 通过 subscribe()方法实现订阅关系
在 RxJava 中,Observable 和 Subscriber 之间通过 emitter.onNext(...)弹射的方式实现主题的消息发布,如图 4-3 所示。
图 4-3 RxJava 通过 emitter.onNext()弹射主题消息
RxJava 中主题的消息发布方式之一是通过内部的弹射器 Emitter 完成。Emitter 除了使用 onNext()方法弹射消息之外,还定义了两个特殊的通知方法:onCompleted()和 onError()。
(1)onCompleted():表示消息序列弹射完结。
RxJava 主题(可观察者)中的 Emitter 可以不只发布(弹射)一个消息,可以重复使用其 onNext()方法弹射一系列消息(或事件),这一系列消息组成一个序列。在绝大部分场景下,Observable 内部有一个专门的 Queue(队列)来负责缓存消息序列。当 Emitter 明确不会再有新的消息弹射出来时,需要触发 onCompleted()方法,作为消息序列的结束标志。
RxJava 主题(可观察者)的 Emitter 弹射器所弹出的消息序列也可以称为消息流。
(2)onError():表示主题的消息序列异常终止。
如果 Observable 在事件处理过程中出现异常,Emitter 的 onError()就会被触发,同时消息序列自动终止,不允许再有消息弹射出来。
RxJava 的一个简单使用示例代码如下:
package com.crazymaker.demo.observerPattern;
//省略 import
@Slf4j
public class RxJavaObserverDemo {
/**
*演示 RxJava 中的 Observer 模式
*/
@Test
public void rxJavaBaseUse() {
//被观察者(主题)
Observable observable = Observable.create(
new Action1<Emitter<String>>() {
@Override
public void call(Emitter<String> emitter) {
emitter.onNext("apple");
emitter.onNext("banana");
emitter.onNext("pear"); emitter.onCompleted();
}
},Emitter.BackpressureMode.NONE);
//订阅者(观察者)
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
log.info("onNext: {}", s);
}
@Override
public void onCompleted() {
log.info("onCompleted");
}
@Override
public void onError(Throwable e) {
log.info("onError");
}
};
//订阅:Observable 与 Subscriber 之间依然通过 subscribe()进行关联
observable.subscribe(subscriber);
}
}
运行这个示例程序,结果如下:
11:29:07.555 [main] INFO c.c.d.o.RxJavaObserverDemo - onNext: apple
11:29:07.564 [main] INFO c.c.d.o.RxJavaObserverDemo - onNext: banana
11:29:07.564 [main] INFO c.c.d.o.RxJavaObserverDemo - onNext: pear
11:29:07.564 [main] INFO c.c.d.o.RxJavaObserverDemo - onCompleted
通过代码和运行接口可以看出:被观察者 Observable 与观察者 Subscriber 产生关联通过 subscribe()方法完成。当订阅开始时,Observable 主题便开始发送事件。
通过代码还可以看出:Subscriber 有 3 个回调方法,其中 onNext(String s)回调方法用于响应 Observable 主题正常的弹射消息,onCompleted()回调方法用于响应 Observable 主题的结束消息,onError(Throwable e)回调方法用于响应 Observable 主题的异常消息。在一个消息序列中,Emitter 弹射器的 onCompleted()正常结束和 onError()异常终止只能调用一个,并且必须是消息序列中最后一个被发送的消息。换句话说,Emitter 的 onCompleted()和 onError()两个方法是互斥的,在消息序列中调用了其中一个,就不可以再调用另一个。
通过示例可以看出,RxJava 与经典的观察者模式不同。在 RxJava 中,主题内部有一个弹射器的角色,而经典的观察者模式中,主题所发送的是单个消息,并不是一个消息序列。
在 RxJava 中,Observable 主题还会负责消息序列缓存,这一点像经典的生产者/消费者模式。在经典的生产者/消费者模式中,生产者生产数据后放入缓存队列,自己不进行处理,而消费者从缓存队列里拿到所要处理的数据,完成逻辑处理。从这一点来说,RxJava 借鉴了生产者消费者模式的思想。
RxJava 的不完整回调
=============
Java 8 引入函数式编程方式大大地提高了编码效率。但是,Java8 的函数式编程有一个非常重要的要求:需要函数式接口作为支撑。什么是函数式接口呢?指的是有且只有一个抽象方法的接口,比如 Java 中内置的 Runnable 接口。
RxJava 的一大特色是支持函数式的编程。由于标准的 Subscriber 观察者接口有 3 个抽象方法,当然就不是一个函数式接口,因此直接使用 Subscriber 观察者接口是不支持函数式编程的。
RxJava 为了支持函数式编程,另外定义了几个函数式接口,比较重要的有 Action0 和 Action1。
1.Action0 回调接口
这是一个无参数、无返回值的函数式接口,源码如下:
package rx.functions;
/**
*A zero-argument action.
*/
public interface Action0 extends Action {
void call();
}
Action0 接口的 call()方法无参数、无返回值,它的具体使用场景对应于 Subscriber 观察者中的 onCompleted()回调方法的使用场景,因为 Subscriber 的 onCompleted()回调方法也是无参数、无返回值的。
2.Action1 回调接口
这是一个有一个参数、泛型、无返回值的函数式接口,源码如下:
package rx.functions;
/**
*A one-argument action.
*@param <T> the first argument type
*/
public interface Action1<T> extends Action {
void call(T t);
}
Action1 回调接口主要有以下两种用途:
(1)作为函数式编程替代使用 Subscriber 的 onNext()方法的传统编程,前提是 Action1 回调接口的泛型类型与 Subscriber 的 onNext()回调方法的参数类型保持一致。
评论