接下来几篇文章,我们来聊一聊 netty 相关的。这里作者想先从 FastThreadLocal 开始说,而不是可能大家更熟悉的 reactor 啊,责任链设计啊,ByteBuf 啊,池化啊等等。不过虽然说 FastThreadLocal 熟知程度不如其他的,但是其实还是很有内容的。比如最核心的为啥快呢?它解决了 jdk 的 ThreadLocal 什么问题?
直译就是本地线程,作者一般喜欢叫线程变量。从 1.2 开始便在 jdk 中了。
这个疑问应该显而易见拉,为什么快啊?这么嚣张在 jdk 的 ThreadLocal 前面加上 Fast!
既然说是 FastThreadLocal,那我们肯定要先看一下 ThreadLocal 是大概怎么实现的
* This class provides thread-local variables. These variables differ from
* their normal counterparts in that each thread that accesses one (via its
* {@code get} or {@code set} method) has its own, independently initialized
* copy of the variable. {@code ThreadLocal} instances are typically private
* static fields in classes that wish to associate state with a thread (e.g.,
* a user ID or Transaction ID).
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
//而ThreadLocal是通过开放地址法的,因为作者Josh Bloch and Doug Lea认为线程变量中并不会存放太多entry
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
T result = (T)e.value;
return result;
return setInitialValue();
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
static class ThreadLocalMap {
* The entries in this hash map extend WeakReference, using
* its main ref field as the key (which is always a
* ThreadLocal object). Note that null keys (i.e. entry.get()
* == null) mean that the key is no longer referenced, so the
* entry can be expunged from table. Such entries are referred to
* as "stale entries" in the code that follows.
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
Entry(ThreadLocal<?> k, Object v) {
value = v;
* The initial capacity -- MUST be a power of two.
private static final int INITIAL_CAPACITY = 16;
* The table, resized as necessary.
* table.length MUST always be a power of two.
private Entry[] table;
为了下文做铺垫,我们来看看 ThreadLocal 是怎么做资源回收的。
首先 Entry 继承了 WeakReference
其次 set 的时候也有清理的逻辑,来看一下 map 的 set 方法
private void set(ThreadLocal<?> key, Object value) {
// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
if (k == null) {
replaceStaleEntry(key, value, i);
tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
FastThreadLocal 来啦
* A special variant of {@link ThreadLocal} that yields higher access performance when accessed from a
* {@link FastThreadLocalThread}.
* <p>
* Internally, a {@link FastThreadLocal} uses a constant index in an array, instead of using hash code and hash table,
* to look for a variable. Although seemingly very subtle, it yields slight performance advantage over using a hash
* table, and it is useful when accessed frequently.
* </p><p>
* To take advantage of this thread-local variable, your thread must be a {@link FastThreadLocalThread} or its subtype.
* By default, all threads created by {@link DefaultThreadFactory} are {@link FastThreadLocalThread} due to this reason.
* </p><p>
* Note that the fast path is only possible on threads that extend {@link FastThreadLocalThread}, because it requires
* a special field to store the necessary state. An access by any other kind of thread falls back to a regular
* {@link ThreadLocal}.
* </p>
public FastThreadLocal() {
index = InternalThreadLocalMap.nextVariableIndex();
* Set the value for the current thread.
public final void set(V value) {
if (value != InternalThreadLocalMap.UNSET) {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
setKnownNotUnset(threadLocalMap, value);
} else {
public static InternalThreadLocalMap get() {
Thread thread = Thread.currentThread();
if (thread instanceof FastThreadLocalThread) {
return fastGet((FastThreadLocalThread) thread);
} else {
return slowGet();
* @return see {@link InternalThreadLocalMap#setIndexedVariable(int, Object)}.
private void setKnownNotUnset(InternalThreadLocalMap threadLocalMap, V value) {
if (threadLocalMap.setIndexedVariable(index, value)) {
addToVariablesToRemove(threadLocalMap, this);
private static void addToVariablesToRemove(InternalThreadLocalMap threadLocalMap, FastThreadLocal<?> variable) {
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
Set<FastThreadLocal<?>> variablesToRemove;
if (v == InternalThreadLocalMap.UNSET || v == null) {
variablesToRemove = Collections.newSetFromMap(new IdentityHashMap<FastThreadLocal<?>, Boolean>());
threadLocalMap.setIndexedVariable(variablesToRemoveIndex, variablesToRemove);
} else {
variablesToRemove = (Set<FastThreadLocal<?>>) v;
上面已经说到了清理部分的逻辑,提到了待清理的 FastThreadLocal 集合,那么这个集合什么时候被清理的呢?
* Removes all {@link FastThreadLocal} variables bound to the current thread. This operation is useful when you
* are in a container environment, and you don't want to leave the thread local variables in the threads you do not
* manage.
public static void removeAll() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.getIfSet();
if (threadLocalMap == null) {
try {
//获取待清理的FastThreadLocal set
Object v = threadLocalMap.indexedVariable(variablesToRemoveIndex);
if (v != null && v != InternalThreadLocalMap.UNSET) {
Set<FastThreadLocal<?>> variablesToRemove = (Set<FastThreadLocal<?>>) v;
FastThreadLocal<?>[] variablesToRemoveArray =
variablesToRemove.toArray(new FastThreadLocal[0]);
for (FastThreadLocal<?> tlv: variablesToRemoveArray) {
} finally {
* Sets the value to uninitialized for the specified thread local map;
* a proceeding call to get() will trigger a call to initialValue().
* The specified thread local map must be for the current thread.
public final void remove(InternalThreadLocalMap threadLocalMap) {
if (threadLocalMap == null) {
Object v = threadLocalMap.removeIndexedVariable(index);
//把自己从待清理的FastThreadLocal set中移除
removeFromVariablesToRemove(threadLocalMap, this);
if (v != InternalThreadLocalMap.UNSET) {
try {
onRemoval((V) v);
} catch (Exception e) {
之前提过的添加到 set 的逻辑
io.netty.util.concurrent.FastThreadLocal#remove(io.netty.util.internal.InternalThreadLocalMap)中使用,用于把自己从待清理的 FastThreadLocal set 中移除,因为已经清理过了
关于清理,这里我们对比一下跟 jdk 原生的区别,很明显,netty 提供了 removeAll 去处理线程绑定的所有线程变量。背后的语义,就是 netty 关注线程对象销毁之后,绑定的线程变量有没有被即使清理,而不会去造成内存溢出。但是这里也可也可以看出,netty 的方式也需要手动维护,那为什么不使用自动化的方式呢?
netty 在 4.1.27.Final 之前的版本使用了一个 ObjectCleaner 的对象。这个对象依旧被保留了,但是原先使用 ObjectCleaner 去清理线程变量的逻辑被注释了,并最终在 netty-4.1.35.Final 中被删除。简单提一下之前的思路,在 set 方法中会注册一个 Cleaner 线程。原理就是利用 AutomaticCleanerReference 的父类构造 java.lang.ref.WeakReference#WeakReference(T, java.lang.ref.ReferenceQueue<? super T>)提供的语义,在 T 对象被销毁之后,会加入 ReferenceQueue。Cleaner 在第一次注册清理线程之后,会启动一个后台线程 CLEANER_TASK 去自旋从这个 ReferenceQueue 中获取对象,如果获取到了就会调用对象对应的清理线程(AutomaticCleanerReference 构造中传入)去执行清理逻辑
那么为什么 netty 现在不用这个逻辑了呢?官网 issue 的大意就是 cleaner 线程无法被停止和控制,所以可能导致线程引用的变量泄漏
private void registerCleaner(final InternalThreadLocalMap threadLocalMap) {
Thread current = Thread.currentThread();
if (FastThreadLocalThread.willCleanupFastThreadLocals(current) || //线程是FastThreadLocalThread类型并且构造这个线程时传入了runnable
threadLocalMap.indexedVariable(cleanerFlagIndex) != InternalThreadLocalMap.UNSET) { //已经注册过了
// removeIndexedVariable(cleanerFlagIndex) isn't necessary because the finally cleanup is tied to the lifetime
// of the thread, and this Object will be discarded if the associated thread is GCed.
threadLocalMap.setIndexedVariable(cleanerFlagIndex, Boolean.TRUE); //设置value,避免重复注册
// We will need to ensure we will trigger remove(InternalThreadLocalMap) so everything will be released
// and FastThreadLocal.onRemoval(...) will be called.
//即为 每个FastThreadLocal注册对象清理器,即线程销毁的时候,把线程的变量map清理掉
ObjectCleaner.register(current, new Runnable() {
public void run() {
// It's fine to not call InternalThreadLocalMap.remove() here as this will only be triggered once
// the Thread is collected by GC. In this case the ThreadLocal will be gone away already.
private static final Runnable CLEANER_TASK = new Runnable() {
public void run() {
boolean interrupted = false;
for (;;) {
// Keep on processing as long as the LIVE_SET is not empty and once it becomes empty
// See if we can let this thread complete.
while (!LIVE_SET.isEmpty()) {
final AutomaticCleanerReference reference;
try {
reference = (AutomaticCleanerReference) REFERENCE_QUEUE.remove(REFERENCE_QUEUE_POLL_TIMEOUT_MS);
} catch (InterruptedException ex) {
// Just consume and move on
interrupted = true;
if (reference != null) {
try {
} catch (Throwable ignored) {
// ignore exceptions, and don't log in case the logger throws an exception, blocks, or has
// other unexpected side effects.
// Its important to first access the LIVE_SET and then CLEANER_RUNNING to ensure correct
// behavior in multi-threaded environments.
if (LIVE_SET.isEmpty() || !CLEANER_RUNNING.compareAndSet(false, true)) {
// There was nothing added after we set STARTED to false or some other cleanup Thread
// was started already so its safe to let this Thread complete now.
if (interrupted) {
// As we caught the InterruptedException above we should mark the Thread as interrupted.
本文从 jdk 原生 ThreadLocal 切入,介绍了为什么 FastThreadLocal 更快,FastThreadLocal 的清理逻辑做了什么优化,去避免线程变量的内存溢出。下一篇我们继续聊聊 netty 拉,再会!想念家宝~