微信公众号:bugstack 虫洞栈 | 1k+关注
跟着案例学 Netty,Netty4.x 案例从简单入门到应用实战,全篇 35 章节优秀案例+源码分析[基础篇(13)、中级篇(12)、高级篇(3 章+)、源码分析篇(6)],以上章节全部完成并不断持续更新中,欢迎关注学习 &下载专题源码
前言介绍
分析 Promise 之前我们先来看两个单词;Promise、Future
>Promise v. 许诺;承诺;答应;保证;使很可能;预示
Future n. 将来;未来;未来的事;将来发生的事;前景;前途;前程
他们的含义都是对未来即将要发生的事情做相应的处理,这也是在异步编程中非常常见的类名。
Netty 是一个异步网络处理框架,在实现中大量使用了 Future 机制,并在 Java 自带 Future 的基础上,增加了 Promise 机制。这两个实现类的目的都是为了使异步编程更加方便使用。
源码分析
1、了解 Java 并发包中的 Future
java 的并发包中提供 java.util.concurrent.Future 类,用于处理异步操作。在 Java 中 Future 是一个未来完成的异步操作,可以获得未来返回的值。如下案例,调用一个获取用户信息的方法,该方法会立刻返回 Future 对象,调用 Future.get()可以同步等待耗时方法的返回,也可以通过调用 future 的 cancel()取消 Future 任务。
class TestFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
TestFuture testFuture = new TestFuture();
Future<String> future = testFuture.queryUserInfo("10001"); //返回future
String userInfo = future.get();
System.out.println("查询用户信息:" + userInfo);
}
private Future<String> queryUserInfo(String userId) {
FutureTask<String> future = new FutureTask<>(() -> {
try {
Thread.sleep(1000);
return "微信公众号:bugstack虫洞栈 | 用户ID:" + userId;
} catch (InterruptedException ignored) {}
return "error";
});
new Thread(future).start();
return future;
}
}
复制代码
2、Netty 实现了自己的 Future
Netty 通过继承 java 并发包的 Future 来定义自己的 Future 接口,为 Future 加入的功能主要有添加、删除监听事件接口,最后由 Promise 实现。
>io.netty.util.concurrent.Future.java 中定义了一些列的异步编程方法 | 经常会使用的>b.bind(port).sync();
// 只有IO操作完成时才返回true
boolean isSuccess();
// 只有当cancel(boolean)成功取消时才返回true
boolean isCancellable();
// IO操作发生异常时,返回导致IO操作以此的原因,如果没有异常,返回null
Throwable cause();
// 向Future添加事件,future完成时,会执行这些事件,如果add时future已经完成,会立即执行监听事件
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 移除监听事件,future完成时,不会触发
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 等待future done
Future<V> sync() throws InterruptedException;
// 等待future done,不可打断
Future<V> syncUninterruptibly();
// 等待future完成
Future<V> await() throws InterruptedException;
// 等待future 完成,不可打断
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// 立刻获得结果,如果没有完成,返回null
V getNow();
// 如果成功取消,future会失败,导致CancellationException
@Override
boolean cancel(boolean mayInterruptIfRunning);
复制代码
3、Promise 机制
Netty 的 Future 与 Java 的 Future 虽然类名相同,但功能上略有不同,Netty 中引入了 Promise 机制。在 Java 的 Future 中,业务逻辑为一个 Callable 或 Runnable 实现类,该类的 call()或 run()执行完毕意味着业务逻辑的完结;而在 Promise 机制中,可以在业务逻辑中人工设置业务逻辑的成功与失败,这样更加方便的监控自己的业务逻辑。
>io.netty.util.concurrent.Promise.java |
public interface Promise<V> extends Future<V> {
// 设置future执行结果为成功
Promise<V> setSuccess(V result);
// 尝试设置future执行结果为成功,返回是否设置成功
boolean trySuccess(V result);
// 设置失败
Promise<V> setFailure(Throwable cause);
// 尝试设置future执行结果为失败,返回是否设置成功
boolean tryFailure(Throwable cause);
// 设置为不能取消
boolean setUncancellable();
// 源码中,以下为覆盖了Future的方法,例如;
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
}
复制代码
>TestPromise.java | 一个查询用户信息的 Promise 列子,加入监听再 operationComplete 完成后,获取查询信息
class TestPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
TestPromise testPromise = new TestPromise();
Promise<String> promise = testPromise.queryUserInfo("10001");
promise.addListener(new GenericFutureListener<Future<? super String>>() {
@Override
public void operationComplete(Future<? super String> future) throws Exception {
System.out.println("addListener.operationComplete > 查询用户信息完成: " + future.get());
}
});
}
private Promise<String> queryUserInfo(String userId) {
NioEventLoopGroup loop = new NioEventLoopGroup();
// 创建一个DefaultPromise并返回,将业务逻辑放入线程池中执行
DefaultPromise<String> promise = new DefaultPromise<String>(loop.next());
loop.schedule(() -> {
try {
Thread.sleep(1000);
promise.setSuccess("微信公众号:bugstack虫洞栈 | 用户ID:" + userId);
return promise;
} catch (InterruptedException ignored) {
}
return promise;
}, 0, TimeUnit.SECONDS);
return promise;
}
}
复制代码
通过这个例子可以看到,Promise 能够在业务逻辑线程中通知 Future 成功或失败,由于 Promise 继承了 Netty 的 Future,因此可以加入监听事件。而 Future 和 Promise 的好处在于,获取到 Promise 对象后可以为其设置异步调用完成后的操作,然后立即继续去做其他任务。
4、Promise 类组织结构 &常用方法
>DefaultChannelPromise 类组织结构图 | 承接 Java 并发包 Future 并增强实现
Netty 中 DefalutPromise 是一个非常常用的类,这是 Promise 实现的基础。DefaultChannelPromise 是 DefalutPromise 的子类,加入了 channel 这个属性。
DefaultPromise | 使用
在 Netty 中使用到 Promise 的地方会非常多,例如在前面一节《一行简单的 writeAndFlush 都做了哪些事》分析 HeadContext.write 中 unsafe.write(msg, promise);结合这一章节可以继续深入了解 Netty 的异步框架原理。另外,服务器/客户端启动时的注册任务,最终会调用 unsafe 的 register,调用过程中会传入一个 promise,unsafe 进行事件的注册时调用 promise 可以设置成功/失败。
>SingleThreadEventLoop.java | 注册服务事件循环组
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
复制代码
DefaultPromise | 实现
DefaultChannelPromise 提供的功能可以分为两个部分;
>AbstractFuture.java | get()方法
public abstract class AbstractFuture<V> implements Future<V> {
@Override
public V get() throws InterruptedException, ExecutionException {
await();
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (await(timeout, unit)) {
Throwable cause = cause();
if (cause == null) {
return getNow();
}
if (cause instanceof CancellationException) {
throw (CancellationException) cause;
}
throw new ExecutionException(cause);
}
throw new TimeoutException();
}
}
复制代码
DefaultPromise 父类 AbstractFuture 提供了两个 get 方法;1、无参数的 get 会阻塞等待;2、有参数的 get 会等待指定事件,若未结束抛出超时异常。
----
>DefaultPromise.java | DefaultPromise.await()方法
@Override
public Promise<V> await() throws Interrupt
// 判断Future任务是否结束,内部根据result是否为null判断,setSuccess或setFailure时会通过CAS修改result
if (isDone()) {
return this;
}
// 线程是否被中断
if (Thread.interrupted()) {
throw new InterruptedException(toS
}
// 检查当前线程是否与线程池运行的线程是一个
checkDeadLock();
synchronized (this) {
while (!isDone()) {
/* waiters计数加1
* private void incWaiters() {
* if (waiters == Short.MAX_VALUE) {
* throw new IllegalStateException("too many waiters: " + this);
* }
* ++waiters;
* }
*/
incWaiters();
try {
// Object的方法,让出CPU,加入等待队列
wait();
} finally {
// waiters计数减1
decWaiters();
}
}
}
return this;
}
复制代码
await(long timeout, TimeUnit unit)与 awite 类似,只是调用了 Object 对象的 wait(long timeout, int nanos)方法 awaitUninterruptibly()方法在内部 catch 住了等待线程的中断异常,因此不会抛出中断异常。
----
>DefaultPromise.java | DefaultPromise.addListener0() / DefaultPromise.removeListener0()
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).add(listener);
} else {
listeners = new DefaultFutureListeners((GenericFutureListener<?>) listeners, listener);
}
}
private void removeListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners instanceof DefaultFutureListeners) {
((DefaultFutureListeners) listeners).remove(listener);
} else if (listeners == listener) {
listeners = null;
}
}
复制代码
----
>DefaultPromise.java | DefaultPromise.notifyListener0() 通知侦听器
@SuppressWarnings({ "unchecked", "rawtypes" })
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
复制代码
在添加监听器时,如果任务刚好执行完毕,则会立即触发监听事件,触发监听通过 notifyListeners()实现。
addListener 和 setSuccess 都会调用 notifyListeners()和 Promise 内的线程池当前执行的线程是同一个线程,则放在线程池中执行,否则提交到线程池去执行;例如,main 线程中调用 addListener 时任务完成,notifyListeners()执行回调,会提交到线程池中执行;而如果是执行 Future 任务的线程池中 setSuccess()时调用 notifyListeners(),会放在当前线程中执行。
----
>DefaultPromise.java | DefaultPromise.setSuccess0()、setFailure0() 唤起等待线程通知成功/失败
// 设置成功后唤醒等待线程
private boolean setSuccess0(V result) {
return setValue0(result == null ? SUCCESS : result);
}
// 设置成功后唤醒等待线程
private boolean setFailure0(Throwable cause) {
return setValue0(new CauseHolder(checkNotNull(cause, "cause")));
}
// 通知成功时将结果保存在变量result,通知失败时,使用CauseHolder包装Throwable赋值给result
// RESULT_UPDATER 是一个使用CAS更新内部属性result的类,
// 如果result为null或UNCANCELLABLE,更新为成功/失败结果;UNCANCELLABLE是不可取消状态
private boolean setValue0(Object objResult) {
if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
// 检查是否有服务,如果有,通知他们。
if (checkNotifyWaiters()) {
notifyListeners(); // 通知
}
return true;
}
return false;
}
复制代码
Future 任务在执行完成后调用 setSuccess()或 setFailure()通知 Future 执行结果;主要逻辑是:修改 result 的值,若有等待线程则唤醒,通知监听事件。
----
DefaultChannelPromise 实现
/**
* The default {@link ChannelPromise} implementation. It is recommended to use {@link Channel#newPromise()} to create
* a new {@link ChannelPromise} rather than calling the constructor explicitly.
*/
public class DefaultChannelPromise extends DefaultPromise<Void> implements ChannelPromise, FlushCheckpoint {
private final Channel channel;
private long checkpoint;
...
}
复制代码
从继承关系可以看到 DefaultChannelPromise 是 DefaultPromise 的实现类,内部维护了一个通道变量 Channel。
另外还实现了 FlushCheckpoint 接口,给 ChannelFlushPromiseNotifier 使用,我们可以将 ChannelFuture 注册到 ChannelFlushPromiseNotifier 类,当有数据写入或到达 checkpoint 时使用。
interface FlushCheckpoint {
long flushCheckpoint();
void flushCheckpoint(long checkpoint)
ChannelPromise promise();
}
复制代码
------------
评论