系列文章:
源码分析 -Netty:开篇
源码分析 -Netty:多线程在 Netty 中的应用
摘要
前面一篇介绍了多线程在 Netty 中的大概使用情况,本篇将结合源码,详细描述使用方式,以及值得我们思考、学习和借鉴的地方。
一 synchronized 使用
关键字 synchronized,我们在并发编程的艺术系列文章中有过描述,用于保证在同一时刻,只有一个线程能够执行某个方法或代码块。同步的作用,既有互斥,也有保证共享变量的可见性:当某个线程修改了变量值并释放锁后,其他线程可以立即获取被修改变量的最新值。
以 ServerBootstrap 进行分析。在类中,定义了一个 final 变量 childOptions,值为 LinkedHashMap。在构造方法中,对 bootstrap.childOptions 进行了同步。这是因为,LinkedHashMap 是非线程安全的,所以当存在多线程并发创建 ServerBootstrap 的实例时,访问和修改这个变量,必须在外部做好同步,否则会导致不可预料的后果。
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap();
private ServerBootstrap(ServerBootstrap bootstrap) {
super(bootstrap);
this.childGroup = bootstrap.childGroup;
this.childHandler = bootstrap.childHandler;
synchronized(bootstrap.childOptions) {
this.childOptions.putAll(bootstrap.childOptions);
}
this.childAttrs.putAll(bootstrap.childAttrs);
}
复制代码
二 正确使用锁
大家都知道使用锁的必要性,但对于锁使用的准确时机、锁的范围(粒度),还有锁之间如何协同的了解不够,这会导致使用出错、或者因为锁范围过大导致不必要的资源消耗,并且降低系统/执行效率。
java.util.concurrent 包内,有一个抽象类 ForkJoinTask。有这样一个场景,externalAwaitDone 方法,直到指定的条件满足时,才会继续执行,代码如下所示:
/**
* Blocks a non-worker-thread until completion.
* @return status upon completion
*/
private int externalAwaitDone() {
int s = ((this instanceof CountedCompleter) ? // try helping
ForkJoinPool.common.externalHelpComplete(
(CountedCompleter<?>)this, 0) :
ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);
if (s >= 0 && (s = status) >= 0) {
boolean interrupted = false;
do {
if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {
synchronized (this) {
if (status >= 0) {
try {
wait(0L);
} catch (InterruptedException ie) {
interrupted = true;
}
}
else
notifyAll();
}
}
} while ((s = status) >= 0);
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
复制代码
其中,最重要的逻辑是 do... while 循环;并且在同步代码块下,有对 wait() 和 notifyAll() 方法的使用。这里面涉及几个关键点:
1)wait 方法用于让线程等待某个条件,且方法必须在同步块内部调用,否则会提示语法错误。使用示例如下:
2)需要在循环内调用 wait 方法,而不能在循环制外调用。尽管有可能不满足唤醒条件,但因为其他线程可能会调用 notifyAll()方法,这会使被阻塞的线程被意外唤醒。这样对锁保护约定的破坏会导致约束失败,从而导致无法预知的结果。
3)notify 和 notifyAll 都是唤醒线程的方法。当不确定应该调用哪个方法时,notifyAll 可以唤醒所有等待的线程。从优化角度来看,如果等待状态的线程都是在等待同一个条件,并且每次只有一个线程可以从这个条件被唤醒,那么应该使用 notify。
三 volatile
java 中,volatile 关键字可以认为是一个轻量级的同步机制。内存模型语义是保证线程可见,以及禁止指令重排序。
NioEventLoop 中,定义了一个私有 int 类型变量:ioRatio。这个变量没有注释,但从定义上看,是用于设置 I/O 执行时间比例的。
private volatile int ioRatio = 50;
public void setIoRatio(int ioRatio) {
if (ioRatio > 0 && ioRatio <= 100) {
this.ioRatio = ioRatio;
} else {
throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)");
}
}
复制代码
在 NioEventLoop 中,线程并没有调用这个设置方法,而是由外部发起设置动作。通常来说,会是业务线程调用这个方法,重新设置参数。 这种情况下,就形成了一个线程写,一个线程读的场景。在这样的场景之下,vplatile 可以用来替代 synchronize 以提升并发访问性能。
四 CAS 与原子类
synchronized 这类互斥同步,由于进行线程阻塞和唤醒,会带来较大的性能损耗,从而也被成为阻塞同步。从乐观/悲观锁的角度来说,是一种悲观的并发策略,所以属于悲观锁。相对地,非阻塞同步,也可以成为乐观锁。这种方法可以简单描述为,先进行操作,在操作完成后判断是否成功(是否有并发问题),如果有,就做失败补偿,如果没有就说明操作成功。通过这样的方式,在一些场景下避免了同步锁的弊端,降低了不必要的资源消耗。
CAS 就是这样的一种非阻塞同步实现方式。IA64 和 X86 这两种指令集下,都是通过 cmpxchg 指令来完成 CAS。其他系统中使用的命令不同。
Netty 中,ChannelOutboundBuffer 类,就有 CAS 的使用实例。为了统计发送的总字节数,类中定于了 totalPendingSize 变量用于记录字节数,这是一个 volatile 变量。我们已经明确,volatile 无法保证多线程并发修改的安全性,所以在类中又定义了一个 AtomicIntegerFieldUpdater 类型边领:TOTAL_PENDING_SIZE_UPDATER,通过它来实现 totalPendingSize 的院子更新。
AtomicIntegerFieldUpdater 是一个抽象类,注释说明如下:
/**
* A reflection-based utility that enables atomic updates to
* designated {@code volatile long} fields of designated classes.
* This class is designed for use in atomic data structures in which
* several fields of the same node are independently subject to atomic
* updates.
*
* <p>Note that the guarantees of the {@code compareAndSet}
* method in this class are weaker than in other atomic classes.
* Because this class cannot ensure that all uses of the field
* are appropriate for purposes of atomic access, it can
* guarantee atomicity only with respect to other invocations of
* {@code compareAndSet} and {@code set} on the same updater.
*
* @since 1.5
* @author Doug Lea
* @param <T> The type of the object holding the updatable field
*/
复制代码
另外:
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size != 0L) {
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > (long)this.channel.config().getWriteBufferHighWaterMark()) {
this.setUnwritable(invokeLater);
}
}
}
复制代码
这里没有使用锁,而是用了 TOTAL_PENDING_SIZE_UPDATER 的 addAndGet 方法。继续查看这个 addAndGet 方法:
/**
* Atomically adds the given value to the current value of the field of
* the given object managed by this updater.
*
* @param obj An object whose field to get and set
* @param delta the value to add
* @return the updated value
*/
public long addAndGet(T obj, long delta) {
long prev, next;
do {
prev = get(obj);
next = prev + delta;
} while (!compareAndSet(obj, prev, next));
return next;
}
复制代码
底层是通过 compareAndSet()实现的。
对新旧值的操作,CAS 通常的使用方法就是:
1)先对 oldValue 进行更新,oldValue = totalPendingSize;
2)重新对更新值进行计算: newWriteBufferSize = oldValue + size;
3)继续循环进行 CAS,直到成功为止。
JDK 中提供的 Atomic 原子类,可以避免同步锁带来的并发访问性能额外损耗的问题,所以 Netty 中我们在很多地方都可以看到 int、long、boolean 等类型的变量使用对应的原子类的情况。这也是 Netty 高性能保障的一个重要来源。
评论