写点什么

netty 案例,netty4.1 源码分析篇六《Netty 异步架构监听类 Promise 源码分析》

用户头像
小傅哥
关注
发布于: 2020 年 08 月 22 日

微信公众号:bugstack 虫洞栈 | 1k+关注

跟着案例学 Netty,Netty4.x 案例从简单入门到应用实战,全篇 35 章节优秀案例+源码分析[基础篇(13)、中级篇(12)、高级篇(3 章+)、源码分析篇(6)],以上章节全部完成并不断持续更新中,欢迎关注学习 &下载专题源码


前言介绍

分析 Promise 之前我们先来看两个单词;Promise、Future

>Promise v. 许诺;承诺;答应;保证;使很可能;预示

Future n. 将来;未来;未来的事;将来发生的事;前景;前途;前程


他们的含义都是对未来即将要发生的事情做相应的处理,这也是在异步编程中非常常见的类名。


Netty 是一个异步网络处理框架,在实现中大量使用了 Future 机制,并在 Java 自带 Future 的基础上,增加了 Promise 机制。这两个实现类的目的都是为了使异步编程更加方便使用。


源码分析


1、了解 Java 并发包中的 Future

java 的并发包中提供 java.util.concurrent.Future 类,用于处理异步操作。在 Java 中 Future 是一个未来完成的异步操作,可以获得未来返回的值。如下案例,调用一个获取用户信息的方法,该方法会立刻返回 Future 对象,调用 Future.get()可以同步等待耗时方法的返回,也可以通过调用 future 的 cancel()取消 Future 任务。


class TestFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException { TestFuture testFuture = new TestFuture(); Future<String> future = testFuture.queryUserInfo("10001"); //返回future String userInfo = future.get(); System.out.println("查询用户信息:" + userInfo); }
private Future<String> queryUserInfo(String userId) { FutureTask<String> future = new FutureTask<>(() -> { try { Thread.sleep(1000); return "微信公众号:bugstack虫洞栈 | 用户ID:" + userId; } catch (InterruptedException ignored) {} return "error"; }); new Thread(future).start(); return future; }
}
复制代码


2、Netty 实现了自己的 Future

Netty 通过继承 java 并发包的 Future 来定义自己的 Future 接口,为 Future 加入的功能主要有添加、删除监听事件接口,最后由 Promise 实现。


>io.netty.util.concurrent.Future.java 中定义了一些列的异步编程方法 | 经常会使用的>b.bind(port).sync();


// 只有IO操作完成时才返回trueboolean isSuccess();// 只有当cancel(boolean)成功取消时才返回trueboolean isCancellable();// IO操作发生异常时,返回导致IO操作以此的原因,如果没有异常,返回nullThrowable cause();// 向Future添加事件,future完成时,会执行这些事件,如果add时future已经完成,会立即执行监听事件Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);// 移除监听事件,future完成时,不会触发Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);// 等待future doneFuture<V> sync() throws InterruptedException;// 等待future done,不可打断Future<V> syncUninterruptibly();// 等待future完成Future<V> await() throws InterruptedException;// 等待future 完成,不可打断Future<V> awaitUninterruptibly();boolean await(long timeout, TimeUnit unit) throws InterruptedException;boolean await(long timeoutMillis) throws InterruptedException;boolean awaitUninterruptibly(long timeout, TimeUnit unit);boolean awaitUninterruptibly(long timeoutMillis);// 立刻获得结果,如果没有完成,返回nullV getNow();// 如果成功取消,future会失败,导致CancellationException@Overrideboolean cancel(boolean mayInterruptIfRunning);
复制代码


3、Promise 机制

Netty 的 Future 与 Java 的 Future 虽然类名相同,但功能上略有不同,Netty 中引入了 Promise 机制。在 Java 的 Future 中,业务逻辑为一个 Callable 或 Runnable 实现类,该类的 call()或 run()执行完毕意味着业务逻辑的完结;而在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败,这样更加方便的监控自己的业务逻辑。


>io.netty.util.concurrent.Promise.java |


public interface Promise<V> extends Future<V> {
// 设置future执行结果为成功 Promise<V> setSuccess(V result); // 尝试设置future执行结果为成功,返回是否设置成功 boolean trySuccess(V result);
// 设置失败 Promise<V> setFailure(Throwable cause);
// 尝试设置future执行结果为失败,返回是否设置成功 boolean tryFailure(Throwable cause);
// 设置为不能取消 boolean setUncancellable(); // 源码中,以下为覆盖了Future的方法,例如; Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener); @Override Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
}
复制代码


>TestPromise.java | 一个查询用户信息的 Promise 列子,加入监听再 operationComplete 完成后,获取查询信息


class TestPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException { TestPromise testPromise = new TestPromise(); Promise<String> promise = testPromise.queryUserInfo("10001"); promise.addListener(new GenericFutureListener<Future<? super String>>() { @Override public void operationComplete(Future<? super String> future) throws Exception { System.out.println("addListener.operationComplete > 查询用户信息完成: " + future.get()); } }); }
private Promise<String> queryUserInfo(String userId) { NioEventLoopGroup loop = new NioEventLoopGroup(); // 创建一个DefaultPromise并返回,将业务逻辑放入线程池中执行 DefaultPromise<String> promise = new DefaultPromise<String>(loop.next()); loop.schedule(() -> { try { Thread.sleep(1000); promise.setSuccess("微信公众号:bugstack虫洞栈 | 用户ID:" + userId); return promise; } catch (InterruptedException ignored) { } return promise; }, 0, TimeUnit.SECONDS); return promise; }
}
复制代码


通过这个例子可以看到,Promise 能够在业务逻辑线程中通知 Future 成功或失败,由于 Promise 继承了 Netty 的 Future,因此可以加入监听事件。而 Future 和 Promise 的好处在于,获取到 Promise 对象后可以为其设置异步调用完成后的操作,然后立即继续去做其他任务。


4、Promise 类组织结构 &常用方法


>DefaultChannelPromise 类组织结构图 | 承接 Java 并发包 Future 并增强实现



Netty 中 DefalutPromise 是一个非常常用的类,这是 Promise 实现的基础。DefaultChannelPromise 是 DefalutPromise 的子类,加入了 channel 这个属性。


DefaultPromise | 使用

在 Netty 中使用到 Promise 的地方会非常多,例如在前面一节《一行简单的 writeAndFlush 都做了哪些事》分析 HeadContext.write 中 unsafe.write(msg, promise);结合这一章节可以继续深入了解 Netty 的异步框架原理。另外,服务器/客户端启动时的注册任务,最终会调用 unsafe 的 register,调用过程中会传入一个 promise,unsafe 进行事件的注册时调用 promise 可以设置成功/失败。


>SingleThreadEventLoop.java | 注册服务事件循环组


@Overridepublic ChannelFuture register(Channel channel) {	return register(new DefaultChannelPromise(channel, this));}
@Overridepublic ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise;}
复制代码


DefaultPromise | 实现

DefaultChannelPromise 提供的功能可以分为两个部分;

  • 为调用者提供 get()和 addListener()用于获取 Future 任务执行结果和添加监听事件。

  • 为业务处理任务提供 setSuccess()等方法设置任务的成功或失败。


>AbstractFuture.java | get()方法


public abstract class AbstractFuture<V> implements Future<V> {
@Override public V get() throws InterruptedException, ExecutionException { await();
Throwable cause = cause(); if (cause == null) { return getNow(); } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); }
@Override public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (await(timeout, unit)) { Throwable cause = cause(); if (cause == null) { return getNow(); } if (cause instanceof CancellationException) { throw (CancellationException) cause; } throw new ExecutionException(cause); } throw new TimeoutException(); }}
复制代码


DefaultPromise 父类 AbstractFuture 提供了两个 get 方法;1、无参数的 get 会阻塞等待;2、有参数的 get 会等待指定事件,若未结束抛出超时异常。


----


>DefaultPromise.java | DefaultPromise.await()方法


@Overridepublic Promise<V> await() throws Interrupt	// 判断Future任务是否结束,内部根据result是否为null判断,setSuccess或setFailure时会通过CAS修改result    if (isDone()) {        return this;    }	// 线程是否被中断    if (Thread.interrupted()) {        throw new InterruptedException(toS    }	// 检查当前线程是否与线程池运行的线程是一个    checkDeadLock();    synchronized (this) {        while (!isDone()) {		   /* waiters计数加1			* private void incWaiters() {			*   if (waiters == Short.MAX_VALUE) {			*       throw new IllegalStateException("too many waiters: " + this);			*   }			*   ++waiters;			* }			*/            incWaiters();            try {				// Object的方法,让出CPU,加入等待队列                wait();            } finally {				// waiters计数减1                decWaiters();            }        }    }    return this;}
复制代码

await(long timeout, TimeUnit unit)与 awite 类似,只是调用了 Object 对象的 wait(long timeout, int nanos)方法 awaitUninterruptibly()方法在内部 catch 住了等待线程的中断异常,因此不会抛出中断异常。


----


>DefaultPromise.java | DefaultPromise.addListener0() / DefaultPromise.removeListener0()


private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {    if (listeners == null) {        listeners = listener;    } else if (listeners instanceof DefaultFutureListeners) {        ((DefaultFutureListeners) listeners).add(listener);    } else {        listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);    }}private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {    if (listeners instanceof DefaultFutureListeners) {        ((DefaultFutureListeners) listeners).remove(listener);    } else if (listeners == listener) {        listeners = null;    }}
复制代码
  • addListener0 方法被调用时,将传入的回调类传入到 listeners 对象中,如果监听多于 1 个,会创建 DefaultFutureListeners 对象将回调方法保存在一个数组中。


  • removeListener0 会将 listeners 设置为 null(只有一个时)或从数组中移除(多个回调时)。


----


>DefaultPromise.java | DefaultPromise.notifyListener0() 通知侦听器


@SuppressWarnings({ "unchecked", "rawtypes" })private static void notifyListener0(Future future, GenericFutureListener l) {    try {        l.operationComplete(future);    } catch (Throwable t) {        if (logger.isWarnEnabled()) {            logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);        }    }}
复制代码


  • 在添加监听器时,如果任务刚好执行完毕,则会立即触发监听事件,触发监听通过 notifyListeners()实现。

  • addListener 和 setSuccess 都会调用 notifyListeners()和 Promise 内的线程池当前执行的线程是同一个线程,则放在线程池中执行,否则提交到线程池去执行;例如,main 线程中调用 addListener 时任务完成,notifyListeners()执行回调,会提交到线程池中执行;而如果是执行 Future 任务的线程池中 setSuccess()时调用 notifyListeners(),会放在当前线程中执行。


  • 内部维护了 notifyingListeners 用来记录是否已经触发过监听事件,只有未触发过且监听列表不为空,才会依次便利并调用 operationComplete


----


>DefaultPromise.java | DefaultPromise.setSuccess0()、setFailure0() 唤起等待线程通知成功/失败


// 设置成功后唤醒等待线程private boolean setSuccess0(V result) {    return setValue0(result == null ? SUCCESS : result);}
// 设置成功后唤醒等待线程private boolean setFailure0(Throwable cause) { return setValue0(new CauseHolder(checkNotNull(cause, "cause")));}
// 通知成功时将结果保存在变量result,通知失败时,使用CauseHolder包装Throwable赋值给result// RESULT_UPDATER 是一个使用CAS更新内部属性result的类,// 如果result为null或UNCANCELLABLE,更新为成功/失败结果;UNCANCELLABLE是不可取消状态private boolean setValue0(Object objResult) { if (RESULT_UPDATER.compareAndSet(this, null, objResult) || RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) { // 检查是否有服务,如果有,通知他们。 if (checkNotifyWaiters()) { notifyListeners(); // 通知 } return true; } return false;}
复制代码


Future 任务在执行完成后调用 setSuccess()或 setFailure()通知 Future 执行结果;主要逻辑是:修改 result 的值,若有等待线程则唤醒,通知监听事件。


----


DefaultChannelPromise 实现


/** * The default {@link ChannelPromise} implementation.  It is recommended to use {@link Channel#newPromise()} to create * a new {@link ChannelPromise} rather than calling the constructor explicitly. */public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
private final Channel channel; private long checkpoint; ...}
复制代码


  • 从继承关系可以看到 DefaultChannelPromise 是 DefaultPromise 的实现类,内部维护了一个通道变量 Channel。

  • 另外还实现了 FlushCheckpoint 接口,给 ChannelFlushPromiseNotifier 使用,我们可以将 ChannelFuture 注册到 ChannelFlushPromiseNotifier 类,当有数据写入或到达 checkpoint 时使用。


interface FlushCheckpoint {    long flushCheckpoint();    void flushCheckpoint(long checkpoint)    ChannelPromise promise();}
复制代码


------------


发布于: 2020 年 08 月 22 日阅读数: 114
用户头像

小傅哥

关注

沉淀、分享、成长,让自己和他人都有所收获 2019.04.03 加入

作者小傅哥,一线互联网 java 工程师、架构师,开发过交易&营销、写过运营&活动、设计过中间件也倒腾过中继器、IO板卡。不只是写Java语言,也搞过C#、PHP,是一个技术活跃的折腾者。

评论

发布
暂无评论
netty案例,netty4.1源码分析篇六《Netty异步架构监听类Promise源码分析》