一、前言
在工作中,多线程访问同一个共享变量时存在并发问题,要么给这个共享变量加锁,要么将变量私有化,能不加锁就不加锁,ThreadLocal
就是让每个线程访问自己的本地变量来避免并发问题。
ThreadLocal
在日常工作中用的很频繁,比如数据库连接、session、cookie 等线程级缓存;面试中也经常被问到,ThreadLocal
的实现原理是什么?为什么会发生内存泄漏?如何解决?
二、ThreadLocal 基本结构关系
代码示例:
ThreadLocal<String> threadLocal = new ThreadLocal<String>();
try {
threadLocal.set("xxx");
threadLocal.get();
} finally {
threadLocal.remove();
}
复制代码
其实很简单,也就 3 个方法。深入到源码发现,每个Thread
维护一个ThreadLocal.ThreadLocalMap
类型的变量threadLocals
,想必这就是存放key-value
的容器了。
public class Thread implements Runnable {
... ...
ThreadLocal.ThreadLocalMap threadLocals = null;
... ...
}
复制代码
而ThreadLocalMap
是ThreadLocal
内部类,ThreadLocalMap
内部又有一个继承了WeakReference
的内部类Entry
,存放key-valued
的,从继承关系看key
是ThreadLocal
类型的对象引用,且是弱引用(记住key
是弱引用很关键)。
(ThreadLocalMap
就相当于一个简易版的HashMap
,了解HashMap
的构造,看这个就很简单了,基本思想都有,容量必须是 2 的整数次方,有扩容,哈希映射,解决哈希冲突的方式开放寻址法等。)
public class ThreadLocal<T> {
... ...
static class ThreadLocalMap {
static class Entry extends WeakReference<ThreadLocal<?>> {
Object value;
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}
/**
* The initial capacity -- MUST be a power of two.
*/
private static final int INITIAL_CAPACITY = 16;
/**
* The table, resized as necessary.
* table.length MUST always be a power of two.
*/
private Entry[] table;
/**
* The number of entries in the table.
*/
private int size = 0;
/**
* The next size value at which to resize.
*/
private int threshold; // Default to 0
/**
* Set the resize threshold to maintain at worst a 2/3 load factor.
*/
private void setThreshold(int len) {
// 扩容因子2/3
threshold = len * 2 / 3;
}
... ...
}
... ...
}
复制代码
三、set
获取当前线程的ThreadLocalMap
,不为 null 则正常设置,否则初始化ThreadLocalMap
并设置值。
// java.lang.ThreadLocal#set
public void set(T value) {
Thread t = Thread.currentThread();
// 获取当前线程的ThreadLocalMap
// ThreadLocalMap是ThreadLocal的内部类,作为每个线程的一个变量
ThreadLocalMap map = getMap(t);
if (map != null)
// ThreadLocal当前实例对象引用作为ThreadLocalMap的key(弱引用)
map.set(this, value);
else
// map=null,第一次设置,则初始化
createMap(t, value);
}
ThreadLocalMap getMap(Thread t) {
return t.threadLocals;
}
复制代码
1、初始化 ThreadLocalMap
若 map=null,第一次设置,则先初始化,初始容量为 16,且不能从外部更改,扩容因子为 2/3。
void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
复制代码
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
table = new Entry[INITIAL_CAPACITY];
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
table[i] = new Entry(firstKey, firstValue);
size = 1;
setThreshold(INITIAL_CAPACITY);
}
private void setThreshold(int len) {
threshold = len * 2 / 3;
}
复制代码
2、设置值
当前线程的map
,不为 null,则调用map#set(ThreadLocal<?> key, Object value)
,ThreadLocal
实例对象引用作为 key:
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
// 哈希映射
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
// 解决哈希冲突的办法:开放寻址
e = tab[i = nextIndex(i, len)]) {
// i位置e!=null 哈希冲突
ThreadLocal<?> k = e.get();
if (k == key) {
// key相等,替换
e.value = value;
return;
}
// k=null,
if (k == null) {
// 替换旧的entry,可防止内存泄漏
// 清理过时数据的过程比较乐观,但又想方设法的去清理过时数据
// 为何乐观和按比例扫描,应该是怕太耗时,影响set的性能吧
replaceStaleEntry(key, value, i);
return;
}
}
// 没有发生hash冲突,在i位置新建新entry
tab[i] = new Entry(key, value);
int sz = ++size;
// cleanSomeSlots按比例扫描,返回结果是boolean,true为清理了过时数据,false没有
// 若没有清理旧数据!false 且 数量达到扩容阈值,则扩容
// 若有清理旧数据!true,(也是为了避免不必要的扩容)
if (!cleanSomeSlots(i, sz) && sz >= threshold)
// 扩容
rehash();
}
复制代码
(1)首先获取key
的hash
值,并与Entry[]
做映射,找到需要设置的位置i
。计算 key 的哈希值也贼简单,维护一个AtomicInteger
变量,每次哈希都加HASH_INCREMENT
。因为数组容量是 2 的整数次,所以可以用高效的&
运算来代替模运算。
private final int threadLocalHashCode = nextHashCode();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
// AtomicInteger 每次加HASH_INCREMENT
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
复制代码
(2)找到i
了,就看看i
位置是否是空的,不为空则说明发生哈希冲突了,开放寻址,向后遍历寻找空闲的位置。
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}
复制代码
倘若遍历到 key 相等的位置,则新值替换旧值,若遍历到位置 k=null,说明弱引用 k 被gc
回收了,value 还被强引用着(一定是代码不规范,忘记调用remove
了),这就有可能造成内存泄漏,所以替换(replaceStaleEntry
)掉过时的entry
(k=null,entry!=null 则为过时entry
)。
replaceStaleEntry:
这个替换也不是简单的替换,还会扫描替换位置staleSlot
的前面是否还有位置(slotToExpunge
)是过时的entry
需要清理,同时也会扫描staleSlot
后的位置(i
)是否有旧 k 和新 key 相等的位置,若有则交换,同时判断是否需要更新slotToExpunge
。
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;
// staleSlot位置,key=null,就是一个应该被替换和删除的旧值
// 扫描staleSlot前面是否有需要删除的位置slotToExpunge
// 直到遇到一个位置是空的停止扫描,也是一种比较乐观的扫描,认为后面没有可清除的entry
int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
// 上一个e的key为null,
if (e.get() == null)
slotToExpunge = i;
// 这个循环为的是扫描staleSlot后面是否存在key相等的槽,有则staleSlot和i交换
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
if (k == key) {
// 找到key相等,新旧交换
e.value = value;
tab[i] = tab[staleSlot];
tab[staleSlot] = e;
// 删除slotToExpunge位置的值
// Start expunge at preceding stale entry if it exists
if (slotToExpunge == staleSlot)
slotToExpunge = i;
// 删除slotToExpunge位置的值,并扫描清理其他过时entry
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}
// 这里也可以看出slotToExpunge的作用就是想找到除staleSlot外是否还有其他位置需求清理
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}
// If key not found, put new entry in stale slot
// 没有找到相等的key,则staleSlot位置put new entry
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);
// If there are any other stale entries in run, expunge them
// 如果有其他需要清除的entry,则清理
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
复制代码
最后slotToExpunge != staleSlot
,说明还有其他位置过时了,需要清理,则调用cleanSomeSlots
。
调用cleanSomeSlots
会先调用expungeStaleEntry
,这两个函数的关系是expungeStaleEntry
是删除指定位置的entry
,而cleanSomeSlots
是按比例扫描其他位置是否还有需要清理的过时entry
。
expungeStaleEntry:
删除指定位置的entry
,同时还会向后遍历是否还有其他过时的entry
,同时还会对未过时的entry
再哈希,使得数组中的位置比较连贯整齐。返回值是被删除位置下一个空位置的索引。
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;
// expunge entry at staleSlot
tab[staleSlot].value = null;
tab[staleSlot] = null;
size--;
// Rehash until we encounter null
Entry e;
int i;
// 从staleSlot开始向后遍历,不为空的位置检查是否k==null
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
// 遍历数组,key为null,清除,防止内存泄漏
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
// key不为null,重新hash,整理一下数组中剩余的元素
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;
// Unlike Knuth 6.4 Algorithm R, we must scan until
// null because multiple entries could have been stale.
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
// the index of the next null slot after staleSlot
// all between staleSlot and this slot will have been checked
// for expunging
return i;
}
复制代码
cleanSomeSlots:
cleanSomeSlots
的目的还是想尽可能多的清理过时的entry
,第一个参数i
一定不是需要清理的位置,开放寻址i
后面是否还有其他位置是过时需要清理的。但是这个过程可能会影响 set 的性能,时间复杂度为 O(n),所以不太可能全表扫描,而是按比例扫描。
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
// cleanSomeSlots目的是想清除所有过时的entry,
// 但是这个过程可能会导致set比较耗时,O(n)的时间复杂度
// 所以按比例扫描,清除过时entry,
// len=16,16/2=8,8/2=4,4/2=2,2/2=1,1/2=0,扫描5次
// 这个比例并不是均匀的
do {
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
// 重置n为 len
n = len;
removed = true;
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
复制代码
(3)找到i
位置是空的说明没有发生哈希冲突。i 位置新建Entry
,并 size+1。
tab[i] = new Entry(key, value);
int sz = ++size;
复制代码
(4)判断是否需要扩容
扩容需要满足两个条件,元素数量size
达到扩容阈值threshold
,cleanSomeSlots
过程没有清理数据,这也是为了避免不必要的扩容。
一次只能加一个元素,前面++size
之后,刚好达到threshold
,即size+1 >=threshold
满足,若cleanSomeSlots
有清理过时数据,则size+1-x
一定是小于threshold
的,所以不需要扩容。
if (!cleanSomeSlots(i, sz) && sz >= threshold)
// 扩容
rehash();
复制代码
若需要扩容,则调用 rehash(),扩容前还需要全表扫描一遍清理过时数据,如果这样都不能使空闲位置增多,则扩容resize
,
private void rehash() {
// 扩容前清除过时数据
expungeStaleEntries();
// Use lower threshold for doubling to avoid hysteresis
// 2/3 - 2/3*1/4= 1/2 到最后扩容阈值成了1/2,为了编码滞后?没看懂。。。
if (size >= threshold - threshold / 4)
resize();
}
复制代码
resize
扩容,新建一个数组,长度为旧数组的 2 倍,然后遍历旧数组再哈希复制到新数组。
/**
* Double the capacity of the table.
* copy on write,只能保证最终一致性
*/
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;
for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null; // Help the GC
} else {
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}
// 设置新阈值
setThreshold(newLen);
size = count;
table = newTab;
}
复制代码
四、get
分析完 set,get 就简单很多了。既然解决哈希冲突用的开放寻址,若哈希映射找到的i
位置不是要找的值,则需要向后寻址查找,若在这个遍历的过程中有遇到过时的数据,则调用expungeStaleEntry
清除,在一定程度上可避免内存泄漏。
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
// 没有找到,做初始化操作
return setInitialValue();
}
复制代码
private Entry getEntry(ThreadLocal<?> key) {
// hash映射,找到i,若i位置不是要找的元素,则需要开放寻址
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
// 开放寻址
return getEntryAfterMiss(key, i, e);
}
复制代码
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;
// 开放寻址找 key相等的value
while (e != null) {
ThreadLocal<?> k = e.get();
if (k == key)
return e;
// 如若遍历到k为null的,执行删除操作,防止内存泄漏
if (k == null)
expungeStaleEntry(i);
else
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
复制代码
五、remove
remove
最简单,删除以ThreadLocal
实例对象引用为 key 的值即可,key
设置为 null,value
设置为 null,这样就可以断开value
的强引用,使得gc
可以回收。
public void remove() {
ThreadLocalMap m = getMap(Thread.currentThread());
if (m != null)
m.remove(this);
}
复制代码
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
// key 设置为null
e.clear();
// 删除旧的entry,value=null
expungeStaleEntry(i);
return;
}
}
}
//java.lang.ref.Reference#clear
public void clear() {
this.referent = null;
}
复制代码
六、内存泄漏问题
在使用ThreadLocal
时,一定听说了,使用完一定要调用remove
,不然会内存泄漏。
1、内存泄漏演示
可以先模拟下,不调用remove
和调用remove
的堆内存使用情况。
public class ThreadLocalTest {
public static void main(String[] args) throws InterruptedException {
ThreadLocal<User> threadLocal = new ThreadLocal<>();
try {
threadLocal.set(new User());
threadLocal.get();
} finally {
// threadLocal.remove();
}
TimeUnit.MINUTES.sleep(5);
}
static class User {
private byte[] datas = new byte[1024*1024*100];
}
private static ThreadLocal<User> threadLocal = new ThreadLocal<>();
}
复制代码
运行上面代码,打开 jdk 自带的Java/jdk1.8.0_241/binjvisualvm.exe
查看堆内存使用情况:
(1)如下图是没有调用remove
的堆内存使用情况,可以看到点击执行垃圾回收后,依然会有 100MB 左右的的垃圾没有回收,点了几次都不会回收,这就是内存泄漏了。
(2)如下图是调用了remove
的堆内存使用情况,点击执行垃圾回收后,堆内存几乎接近 0,说明基本都回收了。
2、内存泄漏的原因
不调用remove
为什么会出现内存泄漏呢?认真读源码就会发现,ThreadLocal
的实例对象引用作为内部类ThreadLocalMap
的 key,而这个 key 是弱引用(弱引用会在下次垃圾回收时,被回收掉),
所以key
就会在下次垃圾回收时回收了,而value
的强引用还在,只要当前线程不销毁,value
就一直不会被垃圾回收,所以就导致内存泄漏了。
如何避免内存泄漏呢,当然是规范写代码,一定记得remove
呀。正确调用remove
的姿势:
3、set 和 get 官方避免内存泄漏
ThreadLocal
源码中set
和get
操作时会在一定程度上清理过时数据(key=null),这也是为了防止内存泄漏吧,但是并不能确保一定能清除掉所有的过时数据,所以不要太指望set
和get
,还是老老实实主动调用remove
吧。
七、总结
ThreadLocal
让每个线程访问自己本地的变量来确保线程安全。
每一个线程维护一个ThreadLocalMap
,简易版的HashMap
,key
为ThradLocal
实例引用,解决哈希冲突的方式是开放寻址法,所以不适合存大量数据。
get 和 set 过程中都会清理过时的数据,但是不一定会清理掉所有的过时数据,不然会影响正常操作性能。
使用完ThreadLocal
一定要调用remove
,不然会内存泄漏,因为 key 是弱引用会被下次 gc,而 value 的强引用会一直存在,若线程一直不销毁,value 就一直不 gc 掉,导致内存泄漏。
PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!
评论