写点什么

深入讲解 RxJava 响应式编程框架,背压问题的几种应对模式

用户头像
小Q
关注
发布于: 2021 年 06 月 07 日

文章首发公众号:Java 架构师联盟,每日更新技术好文

背压

本节首先介绍什么是背压(Backpressure)问题,然后介绍背压问题的几种应对模式。


什么是背压问题

当上下游的流操作处于不同的线程时,如果上游弹射数据的速度快于下游接收处理数据的速度,对于那些没来得及处理的数据就会造成积压,这些数据既不会丢失,又不会被垃圾回收机制回收,而是存放在一个异步缓存池中,如果缓存池中的数据一直得不到处理,越积越多,最后就会造成内存溢出,这便是响应式编程中的背压问题。


一个存在背压问题的演示实例代码如下:


package com.crazymaker.demo.rxJava.basic;//省略import@Slf4jpublic class BackpressureDemo { /** *演示不使用背压 */ @Test public void testNoBackpressure() throws InterruptedException { //被观察者(主题) Observable observable = Observable.create( new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { //循环10次 for (int i = 0;i<10 ; i++) { log.info("produce ->" + i); subscriber.onNext(String.valueOf(i)); } } }); //观察者 Action1<String> subscriber = new Action1<String>() { public void call(String s){ try { //每消费一次间隔50毫秒 Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } log.info("consumer ->" + s); } }; //订阅:observable与subscriber之间依然通过subscribe()进行关联 observable .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .subscribe(subscriber); Thread.sleep(Integer.MAX_VALUE); }}
复制代码


在实例代码中,observable 发射操作执行在一条通过 Schedulers.io()调度器获取的 IO 线程上,而观察者 subscriber 的消费操作执行在另一条通过 Schedulers.newThread()调度器获取的新线程上。observable 流不断发送数据,累积发送 10 次;观察者 subscriber 每隔 50 毫秒接收一条数据。


运行上面的演示程序后,输出的结果如下:


17:56:17.719 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->017:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->117:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->217:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->317:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->417:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->517:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->617:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->717:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->817:56:17.723 [RxIoScheduler-2] INFO c.c.d.r.b.BackpressureDemo - produce ->917:56:17.774 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->017:56:17.824 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->117:56:17.875 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->217:56:17.925 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->317:56:17.976 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->417:56:18.027 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->517:56:18.078 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->617:56:18.129 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->717:56:18.179 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->817:56:18.230 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->9
复制代码


上面的程序有一个特点:生产者 observable 弹射数据的速度大于下游消费者 subscriber 接收处理数据的速度,但是由于数据量小,因此上面的程序运行起来没有出现问题。


简单修改一下生产者,将原来的弹射 10 条改成无限制地弹射,代码如下:


//被观察者(主题) Observable observable = Observable.create( new Observable.OnSubscribe<String>() { @Override public void call(Subscriber<? super String> subscriber) { //无限制地循环 for (int i = 0; ; i++) { //log.info("produce ->" + i); subscriber.onNext(String.valueOf(i)); } } });
复制代码


再次运行该演示程序后,抛出的异常如下:


Caused by: rx.exceptions.MissingBackpressureExceptionat rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.onNext(OperatorObserveOn.java:160)at rx.internal.operators.OperatorSubscribeOn$SubscribeOnSubscriber.onNext(OperatorSubscribeOn.java:74)at com.crazymaker.demo.rxJava.basic.BackpressureDemo$1.call(BackpressureDemo.java:24)at com.crazymaker.demo.rxJava.basic.BackpressureDemo$1.call(BackpressureDemo.java:19)at rx.Observable.unsafeSubscribe(Observable.java:10327)at rx.internal.operators.OperatorSubscribeOn$SubscribeOnSubscriber.call(OperatorSubscribeOn.java:100)at rx.internal.schedulers.CachedThreadScheduler$EventLoopWorker$1.call(CachedThreadScheduler.java:230) ... 9 more
复制代码


异常原因:由于上游 observable 流弹射数据的速度远远大于下游通过 subscriber 接收的速度,导致 observable 用于暂存弹射数据的队列空间耗尽,造成上游数据积压。

背压问题的几种应对模式

如何应对背压问题呢?在创建主题时可以使用 Observable 类的一个重载的 create 方法设置具体的背压模式,该方法的源代码如下:


 public static <T> Observable<T> create(Action1<Emitter<T>> emitter, Emitter.BackpressureMode backpressure) { return unsafeCreate(new OnSubscribeCreate<T>(emitter, backpressure)); }
复制代码


此方法的第二个参数用于指定一种背压模式。背压模式有多种,比较常用的有“最近模式”Emitter.BackpressureMode.LATEST。这种模式的含义为:如果消费跟不上,那么仅仅缓存最近弹射出来的数据,将老旧一点的数据直接丢弃。


使用“最近模式”背压,改写 4.8.1 节的测试用例,代码如下:


 /** *演示使用“最近模式”背压 */ @Test public void testBackpressure() throws InterruptedException { //主题实例,使用背压 Observable observable = Observable.create( new Action1<Emitter<String>> () { @Override public void call(Emitter<String> emitter) { //无限循环 for (int i = 0; ; i++) { //log.info("produce ->" + i); emitter.onNext(String.valueOf(i)); } } }, Emitter.BackpressureMode.LATEST); //订阅者(观察者) Action1<String> subscriber = new Action1<String>() { public void call(String s) { try { //每消费一次间隔50毫秒 Thread.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } log.info("consumer ->" + s); } }; //订阅: observable与subscriber之间依然通过subscribe()进行关联 observable .subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .subscribe(subscriber); Thread.sleep(Integer.MAX_VALUE); }
复制代码


运行这个演示程序,部分输出的结果节选如下:


18:51:54.736 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->018:51:54.745 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->1//省略部分输出18:51:55.217 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->12318:51:55.220 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->12418:51:55.224 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->12518:51:55.228 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->12618:51:55.232 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->12718:51:55.236 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->733765218:51:55.240 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->733765318:51:55.244 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->7337654//省略部分输出18:51:55.595 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->733774718:51:55.598 [RxNewThreadScheduler-1] INFO c.c.d.r.b.BackpressureDemo - consumer ->14161628
复制代码


从输出的结果可以看到,上游主题连续不断地弹射,下游订阅者在接收完 127 后直接跳到了 7337652,其间弹射出来的几百万数据(相对旧一点的数据)就直接被丢弃了。


除了 Emitter.BackpressureMode.LATEST“最近模式”外,RxJava 在 Emitter<T>接口中通过一个枚举常量定义了以下几种背压模式:


enum BackpressureMode { /** *No backpressure is applied(无背压模式)*可能导致rx.exceptions.MissingBackpressureException异常*或者IllegalStateException异常 */ NONE, /** *如果消费者跟不上,就抛出rx.exceptions.MissingBackpressureException异常 */ ERROR, /** *缓存所有的onNext方法弹射出来的消息,等待消费者慢慢地消费 */ BUFFER, /** *如果下游消费跟不上,就丢弃onNext方法弹射出来的新消息 */ DROP, /** *如果消费者跟不上,就丢掉旧的消息,缓存onNext方法弹射出来的新消息 */ LATEST }
复制代码


对于以上 RxJava 背压模式,介绍如下:


(1)BackpressureMode.DROP:在这种模式下,Observable 主题使用固定大小为 128 的缓冲区。如果下游订阅者无法处理,流的第一个元素就会缓存下来,后续的会被丢弃。


(2)BackpressureMode.LATEST:这种模式与 BackpressureMode.DROP 类似,并且 Observable 主题也使用固定大小为 128 的缓冲区。BackpressureMode.LATEST 的缓存策略不同,使用最新的弹出元素替换缓冲区缓存的元素。当消费者可以处理下一个元素时,它收到的是 Observable 最近一次弹出的元素。


(3)BackpressureMode.NONE 和 BackpressureMode.ERROR:在这两种模式中发送的数据不使用背压。如果上游 observable 主题弹射数据的速度大于下游通过 subscriber 接收的速度,造成上游数据积压,就会抛出 MissingBackpressureException 异常。


(4)BackpressureMode.BUFFER:在这种模式下,有一个无限的缓冲区(初始化时是 128),下游消费不了的元素全部会放到缓冲区中。如果缓冲区中持续地积累,就会导致内存耗尽,抛出 OutOfMemoryException 异常。

本文给大家讲解的内容是 SpringCloudRPC 远程调用核心原理: RxJava 响应式编程框架,背压问题的几种应对模式

  1. 下篇文章给大家讲解的是 SpringCloudRPC 远程调用核心原理:Hystrix RPC 保护的原理;

  2. 觉得文章不错的朋友可以转发此文关注小编;

  3. 感谢大家的支持!

发布于: 2021 年 06 月 07 日阅读数: 18
用户头像

小Q

关注

还未添加个人签名 2020.06.30 加入

小Q 公众号:Java架构师联盟 作者多年从事一线互联网Java开发的学习历程技术汇总,旨在为大家提供一个清晰详细的学习教程,侧重点更倾向编写Java核心内容。如果能为您提供帮助,请给予支持(关注、点赞、分享)!

评论

发布
暂无评论
深入讲解RxJava响应式编程框架,背压问题的几种应对模式