写点什么

源码分析 -Netty: 并发编程的实践(二)

发布于: 2021 年 03 月 14 日
源码分析-Netty: 并发编程的实践(二)

系列文章:

源码分析 -Netty:开篇

源码分析 -Netty:多线程在 Netty 中的应用


摘要

前面一篇介绍了多线程在 Netty 中的大概使用情况,本篇将结合源码,详细描述使用方式,以及值得我们思考、学习和借鉴的地方。

一 synchronized 使用

关键字 synchronized,我们在并发编程的艺术系列文章中有过描述,用于保证在同一时刻,只有一个线程能够执行某个方法或代码块。同步的作用,既有互斥,也有保证共享变量的可见性:当某个线程修改了变量值并释放锁后,其他线程可以立即获取被修改变量的最新值。

以 ServerBootstrap 进行分析。在类中,定义了一个 final 变量 childOptions,值为 LinkedHashMap。在构造方法中,对 bootstrap.childOptions 进行了同步。这是因为,LinkedHashMap 是非线程安全的,所以当存在多线程并发创建 ServerBootstrap 的实例时,访问和修改这个变量,必须在外部做好同步,否则会导致不可预料的后果。

private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap();
private ServerBootstrap(ServerBootstrap bootstrap) { super(bootstrap); this.childGroup = bootstrap.childGroup; this.childHandler = bootstrap.childHandler; synchronized(bootstrap.childOptions) { this.childOptions.putAll(bootstrap.childOptions); }
this.childAttrs.putAll(bootstrap.childAttrs); }
复制代码

二 正确使用锁

大家都知道使用锁的必要性,但对于锁使用的准确时机、锁的范围(粒度),还有锁之间如何协同的了解不够,这会导致使用出错、或者因为锁范围过大导致不必要的资源消耗,并且降低系统/执行效率。

java.util.concurrent 包内,有一个抽象类 ForkJoinTask。有这样一个场景,externalAwaitDone 方法,直到指定的条件满足时,才会继续执行,代码如下所示:

/**     * Blocks a non-worker-thread until completion.     * @return status upon completion     */    private int externalAwaitDone() {        int s = ((this instanceof CountedCompleter) ? // try helping                 ForkJoinPool.common.externalHelpComplete(                     (CountedCompleter<?>)this, 0) :                 ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0);        if (s >= 0 && (s = status) >= 0) {            boolean interrupted = false;            do {                if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) {                    synchronized (this) {                        if (status >= 0) {                            try {                                wait(0L);                            } catch (InterruptedException ie) {                                interrupted = true;                            }                        }                        else                            notifyAll();                    }                }            } while ((s = status) >= 0);            if (interrupted)                Thread.currentThread().interrupt();        }        return s;    }
复制代码

其中,最重要的逻辑是 do... while 循环;并且在同步代码块下,有对 wait() 和 notifyAll() 方法的使用。这里面涉及几个关键点:

1)wait 方法用于让线程等待某个条件,且方法必须在同步块内部调用,否则会提示语法错误。使用示例如下:


2)需要在循环内调用 wait 方法,而不能在循环制外调用。尽管有可能不满足唤醒条件,但因为其他线程可能会调用 notifyAll()方法,这会使被阻塞的线程被意外唤醒。这样对锁保护约定的破坏会导致约束失败,从而导致无法预知的结果。


3)notify 和 notifyAll 都是唤醒线程的方法。当不确定应该调用哪个方法时,notifyAll 可以唤醒所有等待的线程。从优化角度来看,如果等待状态的线程都是在等待同一个条件,并且每次只有一个线程可以从这个条件被唤醒,那么应该使用 notify。


三 volatile

java 中,volatile 关键字可以认为是一个轻量级的同步机制。内存模型语义是保证线程可见,以及禁止指令重排序。

NioEventLoop 中,定义了一个私有 int 类型变量:ioRatio。这个变量没有注释,但从定义上看,是用于设置 I/O 执行时间比例的。

private volatile int ioRatio = 50;
public void setIoRatio(int ioRatio) { if (ioRatio > 0 && ioRatio <= 100) { this.ioRatio = ioRatio; } else { throw new IllegalArgumentException("ioRatio: " + ioRatio + " (expected: 0 < ioRatio <= 100)"); } }
复制代码

在 NioEventLoop 中,线程并没有调用这个设置方法,而是由外部发起设置动作。通常来说,会是业务线程调用这个方法,重新设置参数。 这种情况下,就形成了一个线程写,一个线程读的场景。在这样的场景之下,vplatile 可以用来替代 synchronize 以提升并发访问性能。

四 CAS 与原子类

synchronized 这类互斥同步,由于进行线程阻塞和唤醒,会带来较大的性能损耗,从而也被成为阻塞同步。从乐观/悲观锁的角度来说,是一种悲观的并发策略,所以属于悲观锁。相对地,非阻塞同步,也可以成为乐观锁。这种方法可以简单描述为,先进行操作,在操作完成后判断是否成功(是否有并发问题),如果有,就做失败补偿,如果没有就说明操作成功。通过这样的方式,在一些场景下避免了同步锁的弊端,降低了不必要的资源消耗。

CAS 就是这样的一种非阻塞同步实现方式。IA64 和 X86 这两种指令集下,都是通过 cmpxchg 指令来完成 CAS。其他系统中使用的命令不同。

Netty 中,ChannelOutboundBuffer 类,就有 CAS 的使用实例。为了统计发送的总字节数,类中定于了 totalPendingSize 变量用于记录字节数,这是一个 volatile 变量。我们已经明确,volatile 无法保证多线程并发修改的安全性,所以在类中又定义了一个 AtomicIntegerFieldUpdater 类型边领:TOTAL_PENDING_SIZE_UPDATER,通过它来实现 totalPendingSize 的院子更新。

AtomicIntegerFieldUpdater 是一个抽象类,注释说明如下:

/** * A reflection-based utility that enables atomic updates to * designated {@code volatile long} fields of designated classes. * This class is designed for use in atomic data structures in which * several fields of the same node are independently subject to atomic * updates. * * <p>Note that the guarantees of the {@code compareAndSet} * method in this class are weaker than in other atomic classes. * Because this class cannot ensure that all uses of the field * are appropriate for purposes of atomic access, it can * guarantee atomicity only with respect to other invocations of * {@code compareAndSet} and {@code set} on the same updater. * * @since 1.5 * @author Doug Lea * @param <T> The type of the object holding the updatable field */
复制代码

另外:

    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {        if (size != 0L) {            long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);            if (newWriteBufferSize > (long)this.channel.config().getWriteBufferHighWaterMark()) {                this.setUnwritable(invokeLater);            }
} }
复制代码

这里没有使用锁,而是用了 TOTAL_PENDING_SIZE_UPDATER 的 addAndGet 方法。继续查看这个 addAndGet 方法:

/**     * Atomically adds the given value to the current value of the field of     * the given object managed by this updater.     *     * @param obj An object whose field to get and set     * @param delta the value to add     * @return the updated value     */    public long addAndGet(T obj, long delta) {        long prev, next;        do {            prev = get(obj);            next = prev + delta;        } while (!compareAndSet(obj, prev, next));        return next;    }
复制代码

底层是通过 compareAndSet()实现的。

对新旧值的操作,CAS 通常的使用方法就是:

1)先对 oldValue 进行更新,oldValue = totalPendingSize;

2)重新对更新值进行计算: newWriteBufferSize = oldValue + size;

3)继续循环进行 CAS,直到成功为止。

JDK 中提供的 Atomic 原子类,可以避免同步锁带来的并发访问性能额外损耗的问题,所以 Netty 中我们在很多地方都可以看到 int、long、boolean 等类型的变量使用对应的原子类的情况。这也是 Netty 高性能保障的一个重要来源。


发布于: 2021 年 03 月 14 日阅读数: 22
用户头像

磨炼中成长,痛苦中前行 2017.10.22 加入

微信公众号【程序员架构进阶】。多年项目实践,架构设计经验。曲折中向前,分享经验和教训

评论

发布
暂无评论
源码分析-Netty: 并发编程的实践(二)