写点什么

RxJava 的操作符

作者:周杰伦本人
  • 2022-10-20
    贵州
  • 本文字数:1836 字

    阅读完需:约 1 分钟

RxJava 的操作符

创建型操作符

interval 操作符是按照时间间隔来进行输出


Observable.interval(10, TimeUnit.MILLISECONDS)                .subscribe(aLong -> log.info(aLong.toString()));
复制代码


这段代码每隔 10 毫秒输出一次


defer 操作符是延迟创建,当有观察者订阅的时候才会输出消息


Observable observable = Observable.just(foo.get());       Observable dObservable = Observable.defer(() -> Observable.just(foo.get()));
复制代码


just 操作符用来创建一个主题,并将参数弹出。


Observable.just("hello world" )                .subscribe(s -> log.info("just string->" + s));
复制代码


from 操作符是以数组作为输入,创建主题对象,并将数组的元素一个一个输出



String[] items = {"b", "c", "d", "e"}; Observable.from(items) .subscribe(s -> log.info("just string->" + s));
复制代码


range 是整数范围作为输入,包括的区间的上限和下限


bservable.range(1, 8)                .subscribe(i -> log.info("just int->" + i));
复制代码


这行代码会输出 1 到 8 的所有整数

转换操作符

map 操作符是转换的方法,接元素进行转换后弹出


这段代码是将所有元素乘以 4 之后输出


Observable.range(1, 4)                .map(i -> i *i)                .subscribe(i -> log.info(i.toString())); 
复制代码


scan 操作符是将每个数据累积,它会将上一个项的数据累积作为下一项的输入数据。


flatMap 操作符是将元素变成一个新的主题后输出


Observable.range(1, 4)                .flatMap(i -> Observable.range(1, i).toList())                .subscribe(list -> log.info(list.toString()));
复制代码


这段代码输出结果为四个数组

过滤型操作符

过滤型操作符顾名思义就是对结果进行过滤

Filter 操作符

这段代码输出的是能被 5 整除的数


   Observable.range(1, 20)                .filter(integer -> integer%5==0)                .subscribe(i -> log.info("filter int->" + i));         
复制代码


distinct 是对消息重复数据过滤,已经发出去的元素不再发出


Observable.just("apple", "pair", "banana", "apple", "pair")                .distinct()  //使用distinct过滤重复元素                .subscribe(s -> log.info("distinct s->" + s));
复制代码


输出结果为 apple pair 和 banana

聚合操作符

count 操作符就是数据项进行统计,然后输出


String[] items = {"one", "two", "three","fore"};        Integer count = Observable                .from(items)                .count()                .toBlocking().single();        log.info("计数的结果为 {}",count);
复制代码


输出结果为 4,Observable.toBlocking()是返回 BlockingObservable 阻塞实例,然后 single()方法是阻塞当前线程,直到输出唯一的一个元素,如果有多个就会抛出异常。


reduce 操作符和 scan 操作符差不多,scan 操作符每次都要输出结果,reduce 操作符只会输出最后的结果

其他操作符

take 操作符是挑选前 n 个元素,skip 操作符是跳过前 n 个元素


window 操作符是按照固定数量 n 进行分组


List<Integer> srcList = Arrays.asList(10, 11, 20, 21, 30, 31);
Observable.from(srcList) .window(3) .flatMap(o -> o.toList()) .subscribe(list -> log.info(list.toString()));
复制代码


window(3)就是对数组按 3 个一组进行分组,window 方法还可以有两个参数,window(3, 1)就是按照 3 个一组,间隔为 1

HystrixCommand

HystrixCommand 用来封装 RPC 的调用,它有异步执行能力和同步执行能力,先进行缓存是否命中,如果启用了缓存,就可以使用缓存响应请求,、再判断熔断器是否打开,如果熔断开启了直接调用 HystrixCommand 的 getFallback()方法进行服务降级处理,如果熔断器没有开启就判断线程池是否满了,然后没有满就开始执行 run 方法,如果满了同样执行 getFallback()方法进行服务降级处理,自定义 HystrixCommand 类的时候可以重写 run()方法和 getFallback()方法,当出现错误的时候执行 getFallback()方法


Hystrix 的健康统计滑动窗口的执行过程是首先 HystrixCommand 的执行结果会被弹出,然后桶计数流会按照固定时间长度划分滚动窗口,然后按照执行结果进行累积统计

总结

RxJava 的操作符有创建型操作符、转换操作符和过滤型操作符,然后分别对具体的方法进行了介绍,除此之外还有别的运算符,有聚合操作符和其他操作符包括 window 操作符,hystrix 利用了 window 操作符进行健康统计等等,HystrixCommand 是重要的一个类,主要实现它的 run 方法和 getFallback()。

发布于: 刚刚阅读数: 3
用户头像

还未添加个人签名 2020-02-29 加入

公众号《盼盼小课堂》,多平台优质博主

评论

发布
暂无评论
RxJava的操作符_10月月更_周杰伦本人_InfoQ写作社区