响应式编程核心组件
步入正题之前,我希望你对发布者/订阅者模型有一些了解。
直接看图:
Talk is cheap, show you the code!
public class Main {
public static void main(String[] args) {
Flux<Integer> flux = Flux.range(0, 10);
flux.subscribe(i -> {
System.out.println("run1: " + i);
});
flux.subscribe(i -> {
System.out.println("run2: " + i);
});
}
}
复制代码
输出:
run1: 0
run1: 1
run1: 2
run1: 3
run1: 4
run1: 5
run1: 6
run1: 7
run1: 8
run1: 9
run2: 0
run2: 1
run2: 2
run2: 3
run2: 4
run2: 5
run2: 6
run2: 7
run2: 8
run2: 9
Process finished with exit code 0
复制代码
Flux
Flux 是一个多元素的生产者,言外之意,它可以生产多个元素,组成元素序列,供订阅者使用。
Mono
Mono 和 Flux 的区别在于,它只能生产一个元素供生产者订阅,也就是数量的不同。
Mono 的一个常见的应用就是 Mono<ServerResponse>作为 WebFlux 的返回值。毕竟每次请求只有一个 Response 对象,所以 Mono 刚刚好。
快速创建一个 Flux/Mono 并订阅它
来看一些官方文档演示的方法。
Flux<String> seq1 = Flux.just("foo", "bar", "foobar");
List<String> iterable = Arrays.asList("foo", "bar", "foobar");
Flux<String> seq2 = Flux.fromIterable(iterable);
Mono<String> noData = Mono.empty();
Mono<String> data = Mono.just("foo");
Flux<Integer> numbersFromFiveToSeven = Flux.range(5, 3);
复制代码
subscribe()方法(Lambda 形式)
public class FluxIntegerWithSubscribe {
public static void main(String[] args) {
Flux<Integer> integerFlux = Flux.range(0, 10);
integerFlux.subscribe(i -> {
System.out.println("run");
System.out.println(i);
}, error -> {
System.out.println("error");
}, () -> {
System.out.println("done");
}, p -> {
p.request(2);
});
}
}
复制代码
如果去掉初次请求,那么会请求最大值:
public class FluxIntegerWithSubscribe {
public static void main(String[] args) {
Flux<Integer> integerFlux = Flux.range(0, 10);
// 在这里说明一下subscribe()第四个参数,指出了当订阅信号到达,初次请求的个数,如果是null则全部请求(Long.MAX_VALUE)
// 其余subscribe()详见源码或文档:https://projectreactor.io/docs/core/release/reference/#flux
integerFlux.subscribe(i -> {
System.out.println("run");
System.out.println(i);
}, error -> {
System.out.println("error");
}, () -> {
System.out.println("done");
});
}
}
复制代码
输出:
run
0
run
1
run
2
run
3
run
4
run
5
run
6
run
7
run
8
run
9
done
Process finished with exit code 0
复制代码
继承 BaseSubscriber(非 Lambda 形式)
public class FluxWithBaseSubscriber {
public static void main(String[] args) {
Flux<Integer> integerFlux = Flux.range(0, 10);
integerFlux.subscribe(new MySubscriber());
}
/**
* 一般来说,通过继承BaseSubscriber<T>来实现,而且一般自定义hookOnSubscribe()和hookOnNext()方法
*/
private static class MySubscriber extends BaseSubscriber<Integer> {
/**
* 初次订阅时被调用
*/
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("开始啦!");
// 记得至少请求一次,否则不会执行hookOnNext()方法
request(1);
}
/**
* 每次读取新值调用
*/
@Override
protected void hookOnNext(Integer value) {
System.out.println("开始读取...");
System.out.println(value);
// 指出下一次读取多少个
request(2);
}
@Override
protected void hookOnComplete() {
System.out.println("结束啦");
}
}
}
复制代码
输出:
开始啦!
开始读取...
0
开始读取...
1
开始读取...
2
开始读取...
3
开始读取...
4
开始读取...
5
开始读取...
6
开始读取...
7
开始读取...
8
开始读取...
9
结束啦
Process finished with exit code 0
复制代码
终止订阅:Disposable
在这里使用多线程模拟生产者生产的很快,然后立马取消订阅(虽然立刻取消但是由于生产者实在太快了,所以订阅者还是接收到了一些元素)。
其他的方法,比如 Disposables.composite()会得到一个 Disposable 的集合,调用它的 dispose()方法会把集合里的所有 Disposable 的 dispose()方法都调用。
public class FluxWithDisposable {
public static void main(String[] args) {
Disposable disposable = getDis();
// 每次打印数量一般不同,因为调用了disposable的dispose()方法进行了取消,不过如果生产者产地太快了,那么可能来不及终止。
disposable.dispose();
}
private static Disposable getDis() {
class Add implements Runnable {
private final FluxSink<Integer> fluxSink;
public Add(FluxSink<Integer> fluxSink) {
this.fluxSink = fluxSink;
}
@Override
public synchronized void run() {
fluxSink.next(new Random().nextInt());
}
}
Flux<Integer> integerFlux = Flux.create(integerFluxSink -> {
Add add = new Add(integerFluxSink);
new Thread(add).start();
new Thread(add).start();
new Thread(add).start();
new Thread(add).start();
new Thread(add).start();
new Thread(add).start();
new Thread(add).start();
new Thread(add).start();
new Thread(add).start();
new Thread(add).start();
new Thread(add).start();
});
return integerFlux.subscribe(System.out::println);
}
}
复制代码
输出:
这里的输出每次调用可能都会不同,因为订阅之后取消了,所以能打印多少取决于那一瞬间CPU的速度。
复制代码
调整发布者发布速率
public class FluxWithLimitRate1 {
public static void main(String[] args) {
Flux<Integer> integerFlux = Flux.range(0, 100);
integerFlux.subscribe(new MySubscriber());
}
private static class MySubscriber extends BaseSubscriber<Integer> {
@Override
protected void hookOnSubscribe(Subscription subscription) {
System.out.println("开始啦!");
// 记得至少请求一次,否则不会执行hookOnNext()方法
request(1);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("开始读取...");
System.out.println(value);
// 指出下一次读取多少个
request(2);
}
@Override
protected void hookOnComplete() {
System.out.println("结束啦!");
}
}
}
复制代码
public class FluxWithLimitRate2 {
public static void main(String[] args) {
Flux<Integer> integerFlux = Flux.range(0, 100);
// 最后,来看一些Flux提供的预获取方法:
// 指出预取数量
integerFlux.limitRate(10);
// lowTide指出预获取操作的补充优化的值,即修改75%的默认值;highTide指出预获取数量。
integerFlux.limitRate(10, 15);
// 哎~最典型的就是,请求无数:request(Long.MAX_VALUE)但是我给你limitRate(2);那你也只能乖乖每次得到两个哈哈哈哈!
// 还有一个就是limitRequest(N),它会把下流总请求限制为N。如果下流请求超过了N,那么只返回N个,否则返回实际数量。然后认为请求完成,向下流发送onComplete信号。
integerFlux.limitRequest(5).subscribe(new MySubscriber());
// 上面这个只会输出5个。
}
}
复制代码
程序化地创建一个序列
静态同步方法:generate()
现在到了程序化生成 Flux/Mono 的时候。首先介绍 generate()方法,这是一个同步的方法。言外之意就是,它是线程不安全的,且它的接收器只能一次一个的接受输入来生成 Flux/Mono。也就是说,它在任意时刻只能被调用一次且只接受一个输入。
或者这么说,它生成的元素序列的顺序,取决于代码编写的方式。
public class FluxWithGenerate {
public static void main(String[] args) {
// 下面这个是它的变种方法之一:第一个参数是提供初始状态的,第二个参数是一个向接收器写入数据的生成器,入参为state(一般为整数,用来记录状态),和接收器。
// 其他变种请看源码
Flux.generate(() -> 0, (state, sink) -> {
sink.next(state+"asdf");
// 加上对于sink.complete()的调用即可终止生成;否则就是无限序列。
return state+1;
}).subscribe(System.out::println);
// generate方法的第三个参数用于结束生成时被调用,消耗state。
Flux.generate(AtomicInteger::new, (state, sink) -> {
sink.next(state.getAndIncrement()+"qwer");
return state;
}).subscribe(System.out::println);
// generate()的工作流看起来就像:next()->next()->next()->...
}
}
复制代码
静态异步多线程方法:create()
说完了同步生成,接下来就是异步生成,还是多线程的!让我们有请:create()闪亮登场!!!
create()方法对外暴露出一个 FluxSink 对象,通过它我们可以访问并生成需要的序列。除此之外,它还可以触发回调中的多线程事件。
create 另一特性就是很容易把其他的接口与响应式桥接起来。注意,它是异步多线程并不意味着 create 可以并行化你写的代码或者异步执行;怎么理解呢?就是,create 方法里面的 Lambda 表达式代码还是单线程阻塞的。如果你在创建序列的地方阻塞了代码,那么可能造成订阅者即使请求了数据,也得不到,因为序列被阻塞了,没法生成新的。
其实通过上面的现象可以猜测,默认情况下订阅者使用的线程和 create 使用的是一个线程,当然阻塞 create 就会导致订阅者没法运行咯!
上述问题可以通过 Scheduler 解决,后面会提到。
public class FluxWithCreate {
public static void main(String[] args) throws InterruptedException {
TestProcessor<String> testProcessor = new TestProcessor<>() {
private TestListener<String> testListener;
@Override
public void register(TestListener<String> stringTestListener) {
this.testListener = stringTestListener;
}
@Override
public TestListener<String> get() {
return testListener;
}
};
Flux<String> flux = Flux.create(stringFluxSink -> testProcessor.register(new TestListener<String>() {
@Override
public void onChunk(List<String> chunk) {
for (String s : chunk) {
stringFluxSink.next(s);
}
}
@Override
public void onComplete() {
stringFluxSink.complete();
}
}));
flux.subscribe(System.out::println);
System.out.println("现在是2020/10/22 22:58;我好困");
TestListener<String> testListener = testProcessor.get();
Runnable1<String> runnable1 = new Runnable1<>() {
private TestListener<String> testListener;
@Override
public void set(TestListener<String> testListener) {
this.testListener = testListener;
}
@Override
public void run() {
List<String> list = new ArrayList<>(10);
for (int i = 0; i < 10; ++ i) {
list.add(i+"-run1");
}
testListener.onChunk(list);
}
};
Runnable1<String> runnable2 = new Runnable1<>() {
private TestListener<String> testListener;
@Override
public void set(TestListener<String> testListener) {
this.testListener = testListener;
}
@Override
public void run() {
List<String> list = new ArrayList<>(10);
for (int i = 0; i < 10; ++ i) {
list.add(i+"-run2");
}
testListener.onChunk(list);
}
};
Runnable1<String> runnable3 = new Runnable1<>() {
private TestListener<String> testListener;
@Override
public void set(TestListener<String> testListener) {
this.testListener = testListener;
}
@Override
public void run() {
List<String> list = new ArrayList<>(10);
for (int i = 0; i < 10; ++ i) {
list.add(i+"-run3");
}
testListener.onChunk(list);
}
};
runnable1.set(testListener);
runnable2.set(testListener);
runnable3.set(testListener);
// create所谓的"异步","多线程"指的是在多线程中调用sink.next()方法。这一点在下面的push对比中可以看到
new Thread(runnable1).start();
new Thread(runnable2).start();
new Thread(runnable3).start();
Thread.sleep(1000);
testListener.onComplete();
// 另一方面,create的另一个变体可以设置参数来实现负压控制,具体看源码。
}
public interface TestListener<T> {
void onChunk(List<T> chunk);
void onComplete();
}
public interface TestProcessor<T> {
void register(TestListener<T> tTestListener);
TestListener<T> get();
}
public interface Runnable1<T> extends Runnable {
void set(TestListener<T> testListener);
}
}
复制代码
静态异步单线程方法:push()
说完了异步多线程,同步的生成方法,接下来就是异步单线程:push()。
其实说到 push 和 create 的对比,我个人理解如下:
create 允许多线程环境下调用.next()方法,只管生成元素,元素序列的顺序取决于...算了,随机的,毕竟多线程;
但是 push 只允许一个线程生产元素,所以是有序的,至于异步指的是在新的线程中也可以,而不必非得在当前线程。
顺带一提,push 和 create 都支持 onCancel()和 onDispose()操作。一般来说,onCancel 只响应于 cancel 操作,而 onDispose 响应于 error,cancel,complete 等操作。
public class FluxWithPush {
public static void main(String[] args) throws InterruptedException {
TestProcessor<String> testProcessor = new TestProcessor<>() {
private TestListener<String> testListener;
@Override
public void register(TestListener<String> testListener) {
this.testListener = testListener;
}
@Override
public TestListener<String> get() {
return this.testListener;
}
};
Flux<String> flux = Flux.push(stringFluxSink -> testProcessor.register(new TestListener<>() {
@Override
public void onChunk(List<String> list) {
for (String s : list) {
stringFluxSink.next(s);
}
}
@Override
public void onComplete() {
stringFluxSink.complete();
}
}));
flux.subscribe(System.out::println);
Runnable1<String> runnable = new Runnable1<>() {
private TestListener<String> testListener;
@Override
public void set(TestListener<String> testListener) {
this.testListener = testListener;
}
@Override
public void run() {
List<String> list = new ArrayList<>(10);
for (int i = 0; i < 10; ++i) {
list.add(UUID.randomUUID().toString());
}
testListener.onChunk(list);
}
};
TestListener<String> testListener = testProcessor.get();
runnable.set(testListener);
new Thread(runnable).start();
Thread.sleep(15);
testListener.onComplete();
}
public interface TestListener<T> {
void onChunk(List<T> list);
void onComplete();
}
public interface TestProcessor<T> {
void register(TestListener<T> testListener);
TestListener<T> get();
}
public interface Runnable1<T> extends Runnable {
void set(TestListener<T> testListener);
}
}
复制代码
同 create 一样,push 也支持负压调节。但是我没写出来,我试过的 Demo 都是直接请求 Long.MAX_VALUE,其实就是通过 sink.onRequest(LongConsumer)方法调用来实现负压控制的。原理在这,想深究的请自行探索,鄙人不才,花费一下午没实现。
实例方法:handle()
在 Flux 的实例方法里,handle 类似 filter 和 map 的操作。
public class FluxWithHandle {
public static void main(String[] args) {
Flux<String> stringFlux = Flux.push(stringFluxSink -> {
for (int i = 0; i < 10; ++ i) {
stringFluxSink.next(UUID.randomUUID().toString().substring(0, 5));
}
});
// 获取所有包含'a'的串
Flux<String> flux = stringFlux.handle((str, sink) -> {
String s = f(str);
if (s != null) {
sink.next(s);
}
});
flux.subscribe(System.out::println);
}
private static String f(String str) {
return str.contains("a") ? str : null;
}
}
复制代码
线程和调度
Schedulers 的那些静态方法
一般来说,响应式框架都不支持并发,P.s. create 那个是生产者并发,它本身不是并发的。所以也没有可用的并发库,需要开发者自己实现。
同时,每一个操作一般都是在上一个操作所在的线程里运行,它们不会拥有自己的线程,而最顶的操作则是和 subscribe()在同一个线程。比如 Flux.create(...).handle(...).subscribe(...)都在主线程运行的。
在响应式框架里,Scheduler 决定了操作在哪个线程被怎么执行,它的作用类似于 ExecutorService。不过功能稍微多点。如果你想实现一些并发操作,那么可以考虑使用 Schedulers 提供的静态方法,来看看有哪些可用的:
Schedulers.immediate(): 直接在当前线程提交 Runnable 任务,并立即执行。
package com.learn.reactor.flux;
import reactor.core.scheduler.Schedulers;
/**
* @author Mr.M
*/
public class FluxWithSchedulers {
public static void main(String[] args) throws InterruptedException {
// Schedulers.immediate(): 直接在当前线程提交Runnable任务,并立即执行。
System.out.println("当前线程:" + Thread.currentThread().getName());
System.out.println("zxcv");
Schedulers.immediate().schedule(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getName());
System.out.println("qwer");
});
System.out.println("asdf");
// 确保异步任务可以打印出来
Thread.sleep(1000);
}
}
复制代码
通过上面看得出,immediate()其实就是在执行位置插入需要执行的 Runnable 来实现的。和直接把代码写在这里没什么区别。
Schedulers.newSingle():保证每次执行的操作都使用的是一个新的线程。
package com.learn.reactor.flux;
import reactor.core.scheduler.Schedulers;
/**
* @author Mr.M
*/
public class FluxWithSchedulers {
public static void main(String[] args) throws InterruptedException {
// 如果你想让每次调用都是一个新的线程的话,可以使用Schedulers.newSingle(),它可以保证每次执行的操作都使用的是一个新的线程。
Schedulers.single().schedule(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getName());
System.out.println("bnmp");
});
Schedulers.single().schedule(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getName());
System.out.println("ghjk");
});
Schedulers.newSingle("线程1").schedule(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getName());
System.out.println("1234");
});
Schedulers.newSingle("线程1").schedule(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getName());
System.out.println("5678");
});
Schedulers.newSingle("线程2").schedule(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getName());
System.out.println("0100");
});
Thread.sleep(1000);
}
}
复制代码
Schedulers.single(),它的作用是为当前操作开辟一个新的线程,但是记住,所有使用这个方法的操作都共用一个线程;
Schedulers.elastic():一个弹性无界线程池。
无界一般意味着不可管理,因为它可能会导致负压问题和过多的线程被创建。所以马上就要提到它的替代方法。
Schedulers.bounededElastic():有界可复用线程池
package com.learn.reactor.flux;
import reactor.core.scheduler.Schedulers;
/**
* @author Mr.M
*/
public class FluxWithSchedulers {
public static void main(String[] args) throws InterruptedException {
Schedulers.boundedElastic().schedule(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getName());
System.out.println("1478");
});
Schedulers.boundedElastic().schedule(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getName());
System.out.println("2589");
});
Schedulers.boundedElastic().schedule(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getName());
System.out.println("0363");
});
Thread.sleep(1000);
}
}
复制代码
Schedulers.boundedElastic()是一个更好的选择,因为它可以在需要的时候创建工作线程池,并复用空闲的池;同时,某些池如果空闲时间超过一个限定的数值就会被抛弃。
同时,它还有一个容量限制,一般 10 倍于 CPU 核心数,这是它后备线程池的最大容量。最多提交 10 万条任务,然后会被装进任务队列,等到有可用时再调度,如果是延时调度,那么延时开始时间是在有线程可用时才开始计算。
由此可见 Schedulers.boundedElastic()对于阻塞的 I/O 操作是一个不错的选择,因为它可以让每一个操作都有自己的线程。但是记得,太多的线程会让系统备受压力。
Schedulers.parallel():提供了系统级并行的能力
package com.learn.reactor.flux;
import reactor.core.scheduler.Schedulers;
/**
* @author Mr.M
*/
public class FluxWithSchedulers {
public static void main(String[] args) throws InterruptedException {
Schedulers.parallel().schedule(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getName());
System.out.println("6541");
});
Schedulers.parallel().schedule(() -> {
System.out.println("当前线程是:" + Thread.currentThread().getName());
System.out.println("9874");
});
Thread.sleep(1000);
}
}
复制代码
最后,Schedulers.parallel()提供了并行的能力,它会创建数量等于 CPU 核心数的线程来实现这一功能。
其他线程操作
顺带一提,还可以通过 ExecutorService 创建新的 Scheduler。当然,Schedulers 的一堆 newXXX 方法也可以。
有一点很重要,就是 boundedElastic()方法可以适用于传统阻塞式代码,但是 single()和 parallel()都不行,如果你非要这么做那就会抛异常。自定义 Schedulers 可以通过设置 ThreadFactory 属性来设置接收的线程是否是被 NonBlocking 接口修饰的 Thread 实例。
Flux 的某些方法会使用默认的 Scheduler,比如 Flux.interval()方法就默认使用 Schedulers.parallel()方法,当然可以通过设置 Scheduler 来更改这种默认。
在响应式链中,有两种方式可以切换执行上下文,分别是 publishOn()和 subscribeOn()方法,前者在流式链中的位置很重要。在 Reactor 中,可以以任意形式添加任意数量的订阅者来满足你的需求,但是,只有在设置了订阅方法后,才能激活这条订阅链上的全部对象。只有这样,请求才会上溯到发布者,进而产生源序列。
在订阅链中切换执行上下文
publishOn()
publishOn()就和普通操作一样,添加在操作链的中间,它会影响在它下面的所有操作的执行上下文。看个例子:
public class FluxWithPublishOnSubscribeOn {
public static void main(String[] args) throws InterruptedException {
// 创建一个并行线程
Scheduler s = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> flux = Flux
.range(1, 2)
// map肯定是跑在T上的。
.map(i -> 10 + i)
// 此时的执行上下文被切换到了并行线程
.publishOn(s)
// 这个map还是跑在并行线程上的,因为publishOn()的后面的操作都被切换到了另一个执行上下文中。
.map(i -> "value " + i);
// 假设这个new出来的线程名为T
new Thread(() -> flux.subscribe(System.out::println));
Thread.sleep(1000);
}
}
复制代码
subscribeOn()
public class FluxWithPublishOnSubscribeOn {
public static void main(String[] args) throws InterruptedException {
// 依旧是创建一个并行线程
Scheduler ss = Schedulers.newParallel("parallel-scheduler", 4);
final Flux<String> fluxflux = Flux
.range(1, 2)
// 不过这里的map就已经在ss里跑了
.map(i -> 10 + i)
// 这里切换,但是切换的是整个链
.subscribeOn(s)
// 这里的map也运行在ss上
.map(i -> "value " + i);
// 这是一个匿名线程TT
new Thread(() -> fluxflux.subscribe(System.out::println));
Thread.sleep(1000);
}
}
复制代码
subscribeOn()方法会把订阅之后的整个订阅链都切换到新的执行上下文中。无论在 subscribeOn()哪里,都可以把最前面的订阅之后的订阅序列进行切换,当然了,如果后面还有 publishOn(),publishOn()会进行新的切换。
评论