写点什么

netty 系列之: 可以自动通知执行结果的 Future, 有见过吗?

作者:程序那些事
  • 2022 年 1 月 21 日
  • 本文字数:3098 字

    阅读完需:约 10 分钟

netty系列之:可以自动通知执行结果的Future,有见过吗?

简介

在我的心中,JDK 有两个经典版本,第一个就是现在大部分公司都在使用的 JDK8,这个版本引入了 Stream、lambda 表达式和泛型,让 JAVA 程序的编写变得更加流畅,减少了大量的冗余代码。


另外一个版本要早点,还是 JAVA 1.X 的时代,我们称之为 JDK1.5,这个版本引入了 java.util.concurrent 并发包,从此在 JAVA 中可以愉快的使用异步编程。


虽然先 JDK 已经发展到了 17 版本,但是并发这一块的变动并不是很大。受限于 JDK 要保持稳定的需求,所以 concurrent 并发包提供的功能并不能完全满足某些业务场景。所以依赖于 JDK 的包自行研发了属于自己的并发包。


当然,netty 也不例外,一起来看看 netty 并发包都有那些优势吧。

JDK 异步缘起

怎么在 java 中创建一个异步任务,或者开启一个异步的线程,每个人可能都有属于自己的回答。


大家第一时间可能想到的是创建一个实现 Runnable 接口的类,然后将其封装到 Thread 中运行,如下所示:


new Thread(new(RunnableTask())).start()
复制代码


每次都需要 new 一个 Thread 是 JDK 大神们不可接受的,于是他们产生了一个将 thread 调用进行封装的想法,而这个封装类就叫做 Executor.


Executor 是一个 interface,首先看一下这个 interface 的定义:


public interface Executor {
void execute(Runnable command);}
复制代码


接口很简单,就是定义了一个 execute 方法来执行传入的 Runnable 命令。


于是我们可以这样来异步开启任务:


   Executor executor = anExecutor;   executor.execute(new RunnableTask1());   executor.execute(new RunnableTask2());
复制代码


看到这里,聪明的小伙伴可能就要问了,好像不对呀,Executor 自定义了 execute 接口,好像跟异步和多线程并没有太大的关系呀?


别急,因为 Executor 是一个接口,所以我们可以有很多实现。比如下面的直接执行 Runnable,让 Runnable 在当前线程中执行:


 class DirectExecutor implements Executor {   public void execute(Runnable r) {     r.run();   } }
复制代码


又比如下面的在一个新的线程中执行 Runnable:


 class ThreadPerTaskExecutor implements Executor {   public void execute(Runnable r) {     new Thread(r).start();   } }
复制代码


又比如下面的将多个任务存放在一个 Queue 中,执行完一个任务再执行下一个任务的序列执行:


 class SerialExecutor implements Executor {   final Queue<Runnable> tasks = new ArrayDeque<Runnable>();   final Executor executor;   Runnable active;
SerialExecutor(Executor executor) { this.executor = executor; }
public synchronized void execute(final Runnable r) { tasks.offer(new Runnable() { public void run() { try { r.run(); } finally { scheduleNext(); } } }); if (active == null) { scheduleNext(); } }
protected synchronized void scheduleNext() { if ((active = tasks.poll()) != null) { executor.execute(active); } } }
复制代码


这些 Executor 都非常完美。但是他们都只能提交任务,提交任务之后就什么都不知道了。这对于好奇的宝宝们是不可忍受的,因为我们需要知道执行的结果,或者对执行任务进行管控。


于是就有了 ExecutorService。ExecutorService 也是一个接口,不过他提供了 shutdown 方法来停止接受新的任务,和 isShutdown 来判断关闭的状态。


除此之外,它还提供了单独调用任务的 submit 方法和批量调用任务的 invokeAll 和 invokeAny 方法。


既然有了 execute 方法,submit 虽然和 execute 方法基本上执行了相同的操作,但是在方法参数和返回值上有稍许区别。


首先是返回值,submit 返回的是 Future,Future 表示异步计算的结果。 它提供了检查计算是否完成、等待其完成以及检索计算结果的方法。 Future 提供了 get 方法,用来获取计算结果。但是如果调用 get 方法的同时,计算结果并没有准备好,则会发生阻塞。


其次是 submit 的参数,一般来说只有 Callable 才会有返回值,所以我们常用的调用方式是这样的:


<T> Future<T> submit(Callable<T> task);
复制代码


如果我们传入 Runnable,那么虽然也返回一个 Future,但是返回的值是 null:


Future<?> submit(Runnable task);
复制代码


如果我又想传入 Runnable,又想 Future 有返回值怎么办呢?


古人告诉我们,鱼和熊掌不可兼得!但是现在是 2021 年了,有些事情是可以发生改变了:


<T> Future<T> submit(Runnable task, T result);
复制代码


上面我们可以传入一个 result,当 Future 中的任务执行完毕之后直接将 result 返回。


既然 ExecutorService 这么强大,如何创建 ExecutorService 呢?


最简单的办法就是用 new 去创建对应的实例。但是这样不够优雅,于是 JDK 提供了一个 Executors 工具类,他提供了多种创建不同 ExecutorService 的静态方法,非常好用。

netty 中的 Executor

为了兼容 JDK 的并发框架,虽然 netty 中也有 Executor,但是 netty 中的 Executor 都是从 JDK 的并发包中衍生出来的。


具体而言,netty 中的 Executor 叫做 EventExecutor,他继承自 EventExecutorGroup:


public interface EventExecutor extends EventExecutorGroup 
复制代码


而 EventExecutorGroup 又继承自 JDK 的 ScheduledExecutorService:


public interface EventExecutorGroup extends ScheduledExecutorService, Iterable<EventExecutor>
复制代码


为什么叫做 Group 呢?这个 Group 的意思是它里面包含了一个 EventExecutor 的集合。这些结合中的 EventExecutor 通过 Iterable 的 next 方法来进行遍历的。


这也就是为什么 EventExecutorGroup 同时继承了 Iterable 类。


然后 netty 中的其他具体 Executor 的实现再在 EventExecutor 的基础之上进行扩展。从而得到了 netty 自己的 EventExecutor 实现。

Future 的困境和 netty 的实现

那么 JDK 中的 Future 会有什么问题呢?前面我们也提到了 JDK 中的 Future 虽然保存了计算结果,但是我们要获取的时候还是需要通过调用 get 方法来获取。


但是如果当前计算结果还没出来的话,get 方法会造成当前线程的阻塞。


别怕,这个问题在 netty 中被解决了。


先看下 netty 中 Future 的定义:


public interface Future<V> extends java.util.concurrent.Future<V> 
复制代码


可以看到 netty 中的 Future 是继承自 JDK 的 Future。同时添加了 addListener 和 removeListener,以及 sync 和 await 方法。


先讲一下 sync 和 await 方法,两者都是等待 Future 执行结束。不同之处在于,如果在执行过程中,如果 future 失败了,则会抛出异常。而 await 方法不会。


那么如果不想同步调用 Future 的 get 方法来获得计算结果。则可以给 Future 添加 listener。


这样当 Future 执行结束之后,会自动通知 listener 中的方法,从而实现异步通知的效果,其使用代码如下:


EventExecutorGroup group = new DefaultEventExecutorGroup(4); // 4 threadsFuture<?> f = group.submit(new Runnable() { ... });f.addListener(new FutureListener<?> {  public void operationComplete(Future<?> f) {    ..  }});
复制代码


还有一个问题,每次我们提交任务的时候,都需要创建一个 EventExecutorGroup,有没有不需要创建就可以提交任务的方法呢?


有的!


netty 为那些没有时间创建新的 EventExecutorGroup 的同志们,特意创建一个全局的 GlobalEventExecutor,这是可以直接使用的:


GlobalEventExecutor.INSTANCE.execute(new Runnable() { ... });
复制代码


GlobalEventExecutor 是一个单线程的任务执行器,每隔一秒钟回去检测有没有新的任务,有的话就提交到 executor 执行。

总结

netty 为 JDK 的并发包提供了非常有用的扩展。大家可以直接使用。


本文已收录于 http://www.flydean.com/46-netty-future-executor/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

发布于: 刚刚阅读数: 3
用户头像

关注公众号:程序那些事,更多精彩等着你! 2020.06.07 加入

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧,尽在公众号:程序那些事!

评论

发布
暂无评论
netty系列之:可以自动通知执行结果的Future,有见过吗?