写点什么

深入理解 Netty FastThreadLocal

  • 2023-10-19
    广东
  • 本文字数:9491 字

    阅读完需:约 31 分钟

作者:vivo 互联网服务器团队- Jiang Zhu


本文以线上诡异问题为切入点,通过对比 JDK ThreadLocal 和 Netty FastThreadLocal 实现逻辑以及优缺点,并深入解读源码,由浅入深理解 Netty FastThreadLocal。

一、前言

最近在学习 Netty 相关的知识,在看到 Netty FastThreadLocal 章节中,回想起一起线上诡异问题。


  • 问题描述:外销业务获取用户信息判断是否支持 https 场景下,获取的用户信息有时候竟然是错乱的。

  • 问题分析:使用 ThreadLocal 保存用户信息时,未能及时进行 remove()操作,而 Tomcat 工作线程是基于线程池的,会出现线程重用情况,所以获取的用户信息可能是之前线程遗留下来的。

  • 问题修复:ThreadLocal 使用完之后及时 remove()、ThreadLocal 使用之前也进行 remove()双重保险操作。


接下来,我们继续深入了解下 JDK ThreadLocal 和 Netty FastThreadLocal 吧。

二、JDK ThreadLocal 介绍

ThreadLocal 是 JDK 提供的一个方便对象在本线程内不同方法中传递、获取的类。用它定义的变量,仅在本线程中可见,不受其他线程的影响,与其他线程相互隔离


那具体是如何实现的呢?如图 1 所示,每个线程都会有个 ThreadLocalMap 实例变量,其采用懒加载的方式进行创建,当线程第一次访问此变量时才会去创建。

ThreadLocalMap 使用线性探测法存储 ThreadLocal 对象及其维护的数据,具体操作逻辑如下:


  • 假设有一个新的 ThreadLocal 对象,通过 hash 计算它应存储的位置下标为 x。

  • 此时发现下标 x 对应位置已经存储了其他的 ThreadLocal 对象,则它会往后寻找,步长为 1,下标变更为 x+1。

  • 接下来发现下标 x+1 对应位置也已经存储了其他的 ThreadLocal 对象,同理则它会继续往后寻找,下标变更为 x+2。

  • 直到寻找到下标为 x+3 时发现是空闲的,然后将该 ThreadLocal 对象及其维护的数据构建一个 entry 对象存储在 x+3 位置。


在 ThreadLocalMap 中数据很多的情况下,很容易出现 hash 冲突,解决冲突需要不断的向下遍历,该操作的时间复杂度为 O(n),效率较低

图 1


从下面的代码中可以看出:

Entry 的 key 是弱引用,value 是强引用。在 JVM 垃圾回收时,只要发现弱引用的对象,不管内存是否充足,都会被回收。


但是当 ThreadLocal 不再使用被 GC 回收后,ThreadLocalMap 中可能出现 Entry 的 key 为 NULL,那么 Entry 的 value 一直会强引用数据而得不到释放,只能等待线程销毁,从而造成内存泄漏

static class ThreadLocalMap {    // 弱引用,在资源紧张的时候可以回收部分不再引用的ThreadLocal变量    static class Entry extends WeakReference<ThreadLocal<?>> {        // 当前ThreadLocal对象所维护的数据        Object value;         Entry(ThreadLocal<?> k, Object v) {            super(k);            value = v;        }    }    // 省略其他代码}
复制代码

综上所述,既然 JDK 提供的 ThreadLocal 可能存在效率较低和内存泄漏的问题,为啥不做相应的优化和改造呢?

  1. 从 ThreadLocal 类注释看,它是 JDK1.2 版本引入的,早期可能不太关注程序的性能。

  2. 大部分多线程场景下,线程中的 ThreadLocal 变量较少,因此出现 hash 冲突的概率相对较小,及时偶尔出现了 hash 冲突,对程序的性能影响也相对较小。

  3. 对于内存泄漏问题,ThreadLocal 本身已经做了一定的保护措施。作为使用者,在线程中某个 ThreadLocal 对象不再使用或出现异常时,立即调用 remove() 方法删除 Entry 对象,养成良好的编码习惯。

三、Netty FastThreadLocal 介绍

FastThreadLocal 是 Netty 中对 JDK 提供的 ThreadLocal 优化改造版本,从名称上来看,它应该比 ThreadLocal 更快了,以应对 Netty 处理并发量大、数据吞吐量大的场景。


那具体是如何实现的呢?如图 2 所示,每个线程都会有个 InternalThreadLocalMap 实例变量。

每个 FastThreadLocal 实例创建时,都会采用 AtomicInteger 保证顺序递增生成一个不重复的下标 index,它是该 FastThreadLocal 对象维护的数据应该存储的位置。


读写数据的时候通过 FastThreadLocal 的下标 index 直接定位到该 FastThreadLocal 的位置,时间复杂度为 O(1),效率较高。


如果该下标 index 递增到特别大,InternalThreadLocalMap 维护的数组也会特别大,所以 FastThreadLocal 是通过空间换时间来提升读写性能的。

图 2

四、Netty FastThreadLocal 源码分析

4.1 构造方法

public class FastThreadLocal<V> {    // FastThreadLocal中的index是记录了该它维护的数据应该存储的位置    // InternalThreadLocalMap数组中的下标, 它是在构造函数中确定的    private final int index;     public InternalThreadLocal() {        index = InternalThreadLocalMap.nextVariableIndex();    }    // 省略其他代码}
复制代码


public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {    // 自增索引, ⽤于计算下次存储到Object数组中的位置    private static final AtomicInteger nextIndex = new AtomicInteger();     private static final int ARRAY_LIST_CAPACITY_MAX_SIZE = Integer.MAX_VALUE - 8;     public static int nextVariableIndex() {        int index = nextIndex.getAndIncrement();        if (index >= ARRAY_LIST_CAPACITY_MAX_SIZE || index < 0) {            nextIndex.set(ARRAY_LIST_CAPACITY_MAX_SIZE);            throw new IllegalStateException("too many thread-local indexed variables");        }        return index;    }    // 省略其他代码}
复制代码

上面这两段代码在 Netty FastThreadLocal 介绍中已经讲解过,这边就不再重复介绍了。

4.2 get 方法

public class FastThreadLocal<V> {    // FastThreadLocal中的index是记录了该它维护的数据应该存储的位置    private final int index;     public final V get() {        // 获取当前线程的InternalThreadLocalMap        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();        // 根据当前线程的index从InternalThreadLocalMap中获取其绑定的数据        Object v = threadLocalMap.indexedVariable(index);        // 如果获取当前线程绑定的数据不为缺省值UNSET,则直接返回;否则进行初始化        if (v != InternalThreadLocalMap.UNSET) {            return (V) v;        }         return initialize(threadLocalMap);    }    // 省略其他代码}
复制代码


public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {    private static final int INDEXED_VARIABLE_TABLE_INITIAL_SIZE = 32;     // 未赋值的Object变量(缺省值),当⼀个与线程绑定的值被删除之后,会被设置为UNSET    public static final Object UNSET = new Object();     // 存储绑定到当前线程的数据的数组    private Object[] indexedVariables;     // slowThreadLocalMap为JDK ThreadLocal存储InternalThreadLocalMap    private static final ThreadLocal<InternalThreadLocalMap> slowThreadLocalMap =            new ThreadLocal<InternalThreadLocalMap>();     // 从绑定到当前线程的数据的数组中取出index位置的元素    public Object indexedVariable(int index) {        Object[] lookup = indexedVariables;        return index < lookup.length? lookup[index] : UNSET;    }     public static InternalThreadLocalMap get() {        Thread thread = Thread.currentThread();        // 判断当前线程是否是FastThreadLocalThread类型        if (thread instanceof FastThreadLocalThread) {            return fastGet((FastThreadLocalThread) thread);        } else {            return slowGet();        }    }     private static InternalThreadLocalMap fastGet(FastThreadLocalThread thread) {        // 直接获取当前线程的InternalThreadLocalMap        InternalThreadLocalMap threadLocalMap = thread.threadLocalMap();        // 如果当前线程的InternalThreadLocalMap还未创建,则创建并赋值        if (threadLocalMap == null) {            thread.setThreadLocalMap(threadLocalMap = new InternalThreadLocalMap());        }        return threadLocalMap;    }     private static InternalThreadLocalMap slowGet() {        // 使用JDK ThreadLocal获取InternalThreadLocalMap        InternalThreadLocalMap ret = slowThreadLocalMap.get();        if (ret == null) {            ret = new InternalThreadLocalMap();            slowThreadLocalMap.set(ret);        }        return ret;    }     private InternalThreadLocalMap() {        indexedVariables = newIndexedVariableTable();    }     // 初始化一个32位长度的Object数组,并将其元素全部设置为缺省值UNSET    private static Object[] newIndexedVariableTable() {        Object[] array = new Object[INDEXED_VARIABLE_TABLE_INITIAL_SIZE];        Arrays.fill(array, UNSET);        return array;    }    // 省略其他代码}
复制代码

源码中 get() 方法主要分为下面 3 个步骤处理:

  1. 通过 InternalThreadLocalMap.get()方法获取当前线程的 InternalThreadLocalMap。

  2. 根据当前线程的 index 从 InternalThreadLocalMap 中获取其绑定的数据。

  3. 如果不是缺省值 UNSET,直接返回;如果是缺省值,则执行 initialize 方法进行初始化。


下面我们继续分析一下 InternalThreadLocalMap.get()方法的实现逻辑。

  1. 首先判断当前线程是否是 FastThreadLocalThread 类型,如果是 FastThreadLocalThread 类型则直接使用 fastGet 方法获取 InternalThreadLocalMap,如果不是 FastThreadLocalThread 类型则使用 slowGet 方法获取 InternalThreadLocalMap 兜底处理。

  2. 兜底处理中的 slowGet 方法会退化成 JDK 原生的 ThreadLocal 获取 InternalThreadLocalMap。

  3. 获取 InternalThreadLocalMap 时,如果为 null,则会直接创建一个 InternalThreadLocalMap 返回。其创建过过程中初始化一个 32 位长度的 Object 数组,并将其元素全部设置为缺省值 UNSET。

4.3 set 方法

public class FastThreadLocal<V> {    // FastThreadLocal初始化时variablesToRemoveIndex被赋值为0    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();     public final void set(V value) {        // 判断value值是否是未赋值的Object变量(缺省值)        if (value != InternalThreadLocalMap.UNSET) {            // 获取当前线程对应的InternalThreadLocalMap            InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();            // 将InternalThreadLocalMap中数据替换为新的value            // 并将FastThreadLocal对象保存到待清理的Set中            setKnownNotUnset(threadLocalMap, value);        } else {            remove();        }    }     private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {        // 将InternalThreadLocalMap中数据替换为新的value        if (threadLocalMap.setIndexedVariable(index, value)) {            // 并将当前的FastThreadLocal对象保存到待清理的Set中            addToVariablesToRemove(threadLocalMap, this);        }    }     private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {        // 取下标index为0的数据,用于存储待清理的FastThreadLocal对象Set集合中        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);        Set<FastThreadLocal<?>> variablesToRemove;        if (v == InternalThreadLocalMap.UNSET || v == null) {            // 下标index为0的数据为空,则创建FastThreadLocal对象Set集合            variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());            // 将InternalThreadLocalMap中下标为0的数据,设置成FastThreadLocal对象Set集合            threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);        } else {            variablesToRemove = (Set<FastThreadLocal<?>>) v;        }        // 将FastThreadLocal对象保存到待清理的Set中        variablesToRemove.add(variable);    }    // 省略其他代码}
复制代码


public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {    // 未赋值的Object变量(缺省值),当⼀个与线程绑定的值被删除之后,会被设置为UNSET    public static final Object UNSET = new Object();    // 存储绑定到当前线程的数据的数组    private Object[] indexedVariables;    // 绑定到当前线程的数据的数组能再次采用x2扩容的最大量    private static final int ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD = 1 << 30;    private static final int ARRAY_LIST_CAPACITY_MAX_SIZE = Integer.MAX_VALUE - 8;     // 将InternalThreadLocalMap中数据替换为新的value    public boolean setIndexedVariable(int index, Object value) {        Object[] lookup = indexedVariables;        if (index < lookup.length) {            Object oldValue = lookup[index];            // 直接将数组 index 位置设置为 value,时间复杂度为 O(1)            lookup[index] = value;            return oldValue == UNSET;        } else { // 绑定到当前线程的数据的数组需要扩容,则扩容数组并数组设置新value            expandIndexedVariableTableAndSet(index, value);            return true;        }    }     private void expandIndexedVariableTableAndSet(int index, Object value) {        Object[] oldArray = indexedVariables;        final int oldCapacity = oldArray.length;        int newCapacity;        // 判断可进行x2方式进行扩容        if (index < ARRAY_LIST_CAPACITY_EXPAND_THRESHOLD) {            newCapacity = index;            // 位操作,提升扩容效率            newCapacity |= newCapacity >>>  1;            newCapacity |= newCapacity >>>  2;            newCapacity |= newCapacity >>>  4;            newCapacity |= newCapacity >>>  8;            newCapacity |= newCapacity >>> 16;            newCapacity ++;        } else { // 不支持x2方式扩容,则设置绑定到当前线程的数据的数组容量为最大值            newCapacity = ARRAY_LIST_CAPACITY_MAX_SIZE;        }        // 按扩容后的大小创建新数组,并将老数组数据copy到新数组        Object[] newArray = Arrays.copyOf(oldArray, newCapacity);        // 新数组扩容后的部分赋UNSET缺省值        Arrays.fill(newArray, oldCapacity, newArray.length, UNSET);        // 新数组的index位置替换成新的value        newArray[index] = value;        // 绑定到当前线程的数据的数组用新数组替换        indexedVariables = newArray;    }    // 省略其他代码}
复制代码

源码中 set() 方法主要分为下面 3 个步骤处理:

  1. 判断 value 是否是缺省值 UNSET,如果 value 不等于缺省值,则会通过 InternalThreadLocalMap.get()方法获取当前线程的 InternalThreadLocalMap,具体实现 3.2 小节中 get()方法已做讲解。

  2. 通过 FastThreadLocal 中的 setKnownNotUnset()方法将 InternalThreadLocalMap 中数据替换为新的 value,并将当前的 FastThreadLocal 对象保存到待清理的 Set 中。

  3. 如果等于缺省值 UNSET 或 null(else 的逻辑),会调用 remove()方法,remove()具体见后面的代码分析。


接下来我们看下 InternalThreadLocalMap.setIndexedVariable 方法的实现逻辑。

  1. 判断 index 是否超出存储绑定到当前线程的数据的数组 indexedVariables 的长度,如果没有超出,则获取 index 位置的数据,并将该数组 index 位置数据设置新 value。

  2. 如果超出了,绑定到当前线程的数据的数组需要扩容,则扩容该数组并将它 index 位置的数据设置新 value。

  3. 扩容数组以 index 为基准进行扩容,将数组扩容后的容量向上取整为 2 的次幂。然后将原数组内容拷贝到新的数组中,空余部分填充缺省值 UNSET,最终把新数组赋值给 indexedVariables。


下面我们再继续看下 FastThreadLocal.addToVariablesToRemove 方法的实现逻辑。

  1. 取下标 index 为 0 的数据(用于存储待清理的 FastThreadLocal 对象 Set 集合中),如果该数据是缺省值 UNSET 或 null,则会创建 FastThreadLocal 对象 Set 集合,并将该 Set 集合填充到下标 index 为 0 的数组位置。

  2. 如果该数据不是缺省值 UNSET,说明 Set 集合已金被填充,直接强转获取该 Set 集合。

  3. 最后将 FastThreadLocal 对象保存到待清理的 Set 集合中。

4.4 remove、removeAll 方法

public class FastThreadLocal<V> {    // FastThreadLocal初始化时variablesToRemoveIndex被赋值为0    private static final int variablesToRemoveIndex = InternalThreadLocalMap.nextVariableIndex();     public final void remove() {        // 获取当前线程的InternalThreadLocalMap        // 删除当前的FastThreadLocal对象及其维护的数据        remove(InternalThreadLocalMap.getIfSet());    }     public final void remove(InternalThreadLocalMap threadLocalMap) {        if (threadLocalMap == null) {            return;        }         // 根据当前线程的index,并将该数组下标index位置对应的值设置为缺省值UNSET        Object v = threadLocalMap.removeIndexedVariable(index);        // 存储待清理的FastThreadLocal对象Set集合中删除当前FastThreadLocal对象        removeFromVariablesToRemove(threadLocalMap, this);         if (v != InternalThreadLocalMap.UNSET) {            try {                // 空方法,用户可以继承实现                onRemoval((V) v);            } catch (Exception e) {                PlatformDependent.throwException(e);            }        }    }     public static void removeAll() {        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();        if (threadLocalMap == null) {            return;        }         try {            // 取下标index为0的数据,用于存储待清理的FastThreadLocal对象Set集合中            Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);            if (v != null && v != InternalThreadLocalMap.UNSET) {                @SuppressWarnings("unchecked")                Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;                // 遍历所有的FastThreadLocal对象并删除它们以及它们维护的数据                FastThreadLocal<?>[] variablesToRemoveArray =                        variablesToRemove.toArray(new FastThreadLocal[0]);                for (FastThreadLocal<?> tlv: variablesToRemoveArray) {                    tlv.remove(threadLocalMap);                }            }        } finally {            // 删除InternalThreadLocalMap中threadLocalMap和slowThreadLocalMap数据            InternalThreadLocalMap.remove();        }    }     private static void removeFromVariablesToRemove(            InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {        // 取下标index为0的数据,用于存储待清理的FastThreadLocal对象Set集合中        Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);         if (v == InternalThreadLocalMap.UNSET || v == null) {            return;        }         @SuppressWarnings("unchecked")        // 存储待清理的FastThreadLocal对象Set集合中删除该FastThreadLocal对象        Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;        variablesToRemove.remove(variable);    }     // 省略其他代码}
复制代码


public final class InternalThreadLocalMap extends UnpaddedInternalThreadLocalMap {     // 根据当前线程获取InternalThreadLocalMap       public static InternalThreadLocalMap getIfSet() {        Thread thread = Thread.currentThread();        if (thread instanceof FastThreadLocalThread) {            return ((FastThreadLocalThread) thread).threadLocalMap();        }        return slowThreadLocalMap.get();    }     // 数组下标index位置对应的值设置为缺省值UNSET    public Object removeIndexedVariable(int index) {        Object[] lookup = indexedVariables;        if (index < lookup.length) {            Object v = lookup[index];            lookup[index] = UNSET;            return v;        } else {            return UNSET;        }    }     // 删除threadLocalMap和slowThreadLocalMap数据    public static void remove() {        Thread thread = Thread.currentThread();        if (thread instanceof FastThreadLocalThread) {            ((FastThreadLocalThread) thread).setThreadLocalMap(null);        } else {            slowThreadLocalMap.remove();        }    }    // 省略其他代码}
复制代码

源码中 remove() 方法主要分为下面 2 个步骤处理:

  1. 通过 InternalThreadLocalMap.getIfSet()获取当前线程的 InternalThreadLocalMap。具体和 3.2 小节 get()方法里面获取当前线程的 InternalThreadLocalMap 相似,这里就不再重复介绍了。

  2. 删除当前的 FastThreadLocal 对象及其维护的数据。


源码中 removeAll() 方法主要分为下面 3 个步骤处理:

  1. 通过 InternalThreadLocalMap.getIfSet()获取当前线程的 InternalThreadLocalMap。

  2. 取下标 index 为 0 的数据(用于存储待清理的 FastThreadLocal 对象 Set 集合),然后遍历所有的 FastThreadLocal 对象并删除它们以及它们维护的数据。

  3. 最后会将 InternalThreadLocalMap 本身从线程中移除。

五、总结

那么使用 ThreadLocal 时最佳实践又如何呢?

每次使用完 ThreadLocal 实例,在线程运行结束之前的 finally 代码块中主动调用它的 remove()方法,清除 Entry 中的数据,避免操作不当导致的内存泄漏。


使⽤Netty 的 FastThreadLocal 一定比 JDK 原生的 ThreadLocal 更快吗?

不⼀定。当线程是 FastThreadLocalThread,则添加、获取 FastThreadLocal 所维护数据的时间复杂度是 O(1),⽽使⽤ThreadLocal 可能存在哈希冲突,相对来说使⽤FastThreadLocal 更⾼效。但如果是普通线程则可能更慢。


使⽤FastThreadLocal 有哪些优点?

正如文章开头介绍 JDK 原生 ThreadLocal 存在的缺点,FastThreadLocal 全部优化了,它更⾼效、而且如果使⽤的是 FastThreadLocal,它会在任务执⾏完成后主动调⽤removeAll⽅法清除数据,避免潜在的内存泄露。

发布于: 刚刚阅读数: 6
用户头像

官方公众号:vivo互联网技术,ID:vivoVMIC 2020-07-10 加入

分享 vivo 互联网技术干货与沙龙活动,推荐最新行业动态与热门会议。

评论

发布
暂无评论
深入理解 Netty FastThreadLocal_性能优化_vivo互联网技术_InfoQ写作社区