SpringCloudRPC 调用核心原理:RxJava 响应式编程框架,其他操作符
图 4-12 使用 window 操作符创建固定数量窗口(滚动窗口)
一个使用 window 操作符以固定数量进行元素分组的示例程序如下:
package com.crazymaker.demo.rxJava.basic;
//省略 import
@Slf4jpublic class WindowDemo
{
/**
*演示 window 创建操作符创建滚动窗口
*/
@Test
public void simpleWindowObserverDemo()
{
List<Integer> srcList = Arrays.asList(10, 11, 20, 21, 30, 31);
Observable.from(srcList)
.window(3) //以固定数量分组
.flatMap(o -> o.toList())
.subscribe(list -> log.info(list.toString()));
}
...
}
运行这个演示程序,输出的结果如下:
[main] INFO c.c.d.rxJava.basic.WindowDemo - [10, 11, 20]
[main] INFO c.c.d.rxJava.basic.WindowDemo - [21, 30, 31]
在使用 window 进行分组时,不同窗口的元素还可以重叠,可以理解成滑动窗口。
创建重叠窗口使用函数 window(int count,int skip),其中第一个参数为窗口的元素个数,第二个参数为下一个窗口跳过的元素个数。使用 window 操作符创建重叠窗口的处理流程如图 4-13 所示。
图 4-13 使用 window 操作符创建重叠窗口(滑动窗口)
使用 window 操作符以固定数量创建重叠窗口的示例程序如下:
package com.crazymaker.demo.rxJava.basic;
//省略 import
@Slf4j
public class WindowDemo
{
...
/**
*演示 window 创建操作符创建滑动窗口
*/
@Test
public void windowObserverDemo()
{
List<Integer> srcList = Arrays.asList(10, 11, 20, 21, 30, 31);
Observable.from(srcList)
.window(3, 1)
.flatMap(o -> o.toList())
.subscribe(list -> log.info(list.toString()));
}
...
}
运行这个演示程序,输出的结果如下:
[main] INFO c.c.d.rxJava.basic.WindowDemo - [10, 11, 20]
[main] INFO c.c.d.rxJava.basic.WindowDemo - [11, 20, 21]
[main] INFO c.c.d.rxJava.basic.WindowDemo - [20, 21, 30]
[main] INFO c.c.d.rxJava.basic.WindowDemo - [21, 30, 31]
[main] INFO c.c.d.rxJava.basic.WindowDemo - [30, 31]
[main] INFO c.c.d.rxJava.basic.WindowDemo - [31]
RxJava 的窗口还可以按照固定时间间隔进行分组。一个使用 window 操作符以固定时间间隔创建不重叠窗口(滚动窗口)的示例程序如下:
package com.crazymaker.demo.rxJava.basic;
//省略 import
@Slf4j
public class WindowDemo
{
...
/**
*演示 window 创建操作符创建时间窗口
*/
@Test
public void timeWindowObserverDemo() throws InterruptedException
{
Observable eventStream = Observable
.interval(100, TimeUnit.MILLISECONDS);
eventStream.window(300, TimeUnit.MILLISECONDS)
.flatMap(o -> ((Observable<Integer>)
o).toList())
.subscribe(list -> log.info(list.toString()));
Thread.sleep(Integer.MAX_VALUE);
}
...
}
评论