并发系列之ThreadLocal

并发系列之ThreadLocal

参考链接:https://www.cnblogs.com/micrari/p/6790229.html

线程封闭

在《Java并发编程实战》一书中提到,“当访问共享的可变数据时,通常需要使用同步。一种避免使用同步的方式就是不共享数据”。因此提出了“线程封闭“的概念,一种经常使用线程封闭的应用场景就是JDBC的Connection,通过线程封闭技术,可以把连接对象封闭到某个线程内部,从而避免出现多个线程共享同一个连接的情况。而线程封闭总共有三种类型的呈现形式:

(1)Ad-hoc线程封闭。维护线程封闭性的职责有程序实现来承担,然而这种实现方式是脆弱的;

(2)栈封闭。实际上通过尽量使用局部变量的方式,避免其他线程获取数据;

(3)ThreadLocal类。ThreadLocal类顾名思义可以理解为本地线程本地变量。也就是说如果定义一个ThreadLocal,每个线程往这个ThreadLocal中读写是线程隔离,互相之间不会影响的。它提供了一种将可变数据通过每个线程有自己的独立副本从而实现线程封闭的机制。

ThreadLocal工作原理

1:Thread类中有一个成员变量属于ThreadLocalMap类(一个定义在ThreadLocal类中的内部类),它是一个Map,他的key是ThreadLocal实例对象。

2:当为ThreadLocal类的对象set值时,首先获得当前线程的ThreadLocalMap类属性,然后以ThreadLocal类的对象为key,设定value。get值时则类似。每个线程在往某个ThreadLocal里塞值的时候,都会往自己的ThreadLocalMap里存,读也是以某个ThreadLocal作为引用,在自己的map里找对应的key,从而实现了线程隔离。

3:ThreadLocal变量的活动范围为某线程,是该线程“专有的,独自霸占”的,对该变量的所有操作均由该线程完成!也就是说,ThreadLocal不是用来解决共享对象的多线程访问的竞争问题的,因为ThreadLocal.set()到线程中对象是该线程自己使用的对象,其他线程是不需要访问的,也访问不到的。当线程终止后,这些值会作为垃圾回收。

源码解析

先来看一下ThreadLocal的结构图

ThreadLocal结构图

ThreadLocal里面有一个内部类——ThreadLocalMap,而它的get、set、remove等方法都是对ThreadLocalMap进行操作的,所以我们直接看一下这个ThreadLocalMap类。

ThreadLocalMap结构图

ThreadLocalMap结构图

存储结构
1
2
3
4
5
6
7
8
9
static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;// 实际保存的值

Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

Entry就是ThreadLocalMap里面定义的节点,继承了WeakReference类,key为ThreadLocal,value为实际存放的值。之所以说是简单视作,因为实际上ThreadLocal中存放的是ThreadLocal的弱引用。

为什么要用弱引用

因为如果这里使用普通的key-value形式来定义存储结构,实质上就会造成节点的生命周期与线程强绑定,只要线程没有销毁,那么节点在GC分析中一直储于可达状态,没办法回收,而程序本身也无法判断是否可以清理节点。弱引用是Java中四档引用的第三档,比软引用更加弱一些,如果一个对象没有强引用链可达,那么一般活不过下一次GC。当某个ThreadLocal已经没有强引用可达,则随着它被垃圾回收,在ThreadLocalMap里对应的Entry的键值会失效,这为ThreadLocal本身的垃圾清理提供了便利。

变量定义
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// 初始容量 —— 必须是2的幂次
private static final int INITIAL_CAPACITY = 16;

// Entry表 —— 必须是2的幂次
private Entry[] table;

// Entry表中entry的个数
private int size = 0;

// 哈希表扩容阈值,默认为0
private int threshold;

// 设置resize阈值以维持最坏2/3的装载因子
private void setThreshold(int len) {
threshold = len * 2 / 3;
}

// 环形意义的下一个索引
private static int nextIndex(int i, int len) {
return ((i + 1 < len) ? i + 1 : 0);
}

// 环形意义的上一个索引
private static int prevIndex(int i, int len) {
return ((i - 1 >= 0) ? i - 1 : len - 1);
}

ThreadLocal有两个方法用于得到上一个/下一个索引,这里实际上是环形意义下的上一个与下一个。

由于ThreadLocalMap使用线性探测法来解决散列冲突,所以实际上Entry[]数组在程序逻辑上是作为一个环形存在的。

至此,我们已经可以大致勾勒出ThreadLocalMap的内部存储结构。下面是我绘制的示意图。虚线表示弱引用,实线表示强引用。

ThreadLocalMap内部存储结构

ThreadLocalMap维护了Entry环形数组,数组中元素Entry的逻辑上的key为某个ThreadLocal对象(实际上是指向该ThreadLocal对象的弱引用),value为代码中该线程往该ThreadLoacl变量实际塞入的值。

构造方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* 构造一个包含firstKey和FirstValue的map。
* ThreadLocalMap是惰性构造的,所以只有当至少要往里面放一个元素的时候才会构造它
*/
ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
// 初始化table数组
table = new Entry[INITIAL_CAPACITY];
// 计算hash值
int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
// 初始化此节点
table[i] = new Entry(firstKey, firstValue);
// 设置节点表大小为1
size = 1;
// 设定扩容阈值
setThreshold(INITIAL_CAPACITY);
}
哈希函数

重点看一下上面构造函数中的int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);这一行代码。

ThreadLocal类中有一个被final修饰的类型为int的threadLocalHashCode,它在该ThreadLocal被构造的时候就会生成,相当于一个ThreadLocal的ID,而它的值来源于

1
2
3
4
5
6
7
8
/*
* 生成hash code间隙为这个魔数,可以让生成出来的值或者说ThreadLocal的ID较为均匀地分布在2的幂大小的数组中。
*/
private static final int HASH_INCREMENT = 0x61c88647;

private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}

可以看出,它是在上一个被构造出的ThreadLocal的ID/threadLocalHashCode的基础上加上一个魔数0x61c88647的。这个魔数的选取与斐波那契散列有关,0x61c88647对应的十进制为1640531527。斐波那契散列的乘数可以用(long) ((1L << 31) * (Math.sqrt(5) - 1))可以得到2654435769,如果把这个值给转为带符号的int,则会得到-1640531527。换句话说
(1L << 32) - (long) ((1L << 31) * (Math.sqrt(5) - 1))得到的结果就是1640531527也就是0x61c88647。通过理论与实践,当我们用0x61c88647作为魔数累加为每个ThreadLocal分配各自的ID也就是threadLocalHashCode再与2的幂取模,得到的结果分布很均匀。
ThreadLocalMap使用的是线性探测法,均匀分布的好处在于很快就能探测到下一个临近的可用slot,从而保证效率。这就回答了上文抛出的为什么大小要为2的幂的问题。为了优化效率。

对于& (INITIAL_CAPACITY - 1),相信有过算法竞赛经验或是阅读源码较多的程序员,一看就明白,对于2的幂作为模数取模,可以用&(2^n-1)来替代%2^n,位运算比取模效率高很多。至于为什么,因为对2^n取模,只要不是低n位对结果的贡献显然都是0,会影响结果的只能是低n位。

get()
1
2
3
4
5
6
7
8
9
10
11
12
13
public T get() {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue();
}

get()实际上就是对ThreadLocalMap的getEntry()进行操作

getEntry()
1
2
3
4
5
6
7
8
9
10
11
private Entry getEntry(ThreadLocal<?> key) {
// 根据key这个ThreadLocal的ID来获取索引,也即哈希值
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
// 对应的entry存在且未失效且弱引用指向的ThreadLocal就是key,则命中返回
if (e != null && e.get() == key)
return e;
else
// 因为是线性探测,所以往后找还是有可能能够找到目标Entry的。
return getEntryAfterMiss(key, i, e);
}
getEntryAfterMiss() —— 调用getEntry未直接命中的时候调用此方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;

// 基于线性探测法不断向后探测直到遇到空entry
while (e != null) {
ThreadLocal<?> k = e.get();
// 找到目标
if (k == key)
return e;
if (k == null)
// 该entry对应的ThreadLocal一级被回收,调用expungeStaleEntry来清理无效的entry
expungeStaleEntry(i);
else
// 环形意义下往后面走
i = nextIndex(i, len);
e = tab[i];
}
return null;
}
expungeStaleEntry() —— 核心清理函数

expungeStaleEntry()函数是ThreadLocal中核心清理函数,它做的事情很简单:

  • 就是从staleSlot开始遍历,将无效(弱引用指向对象被回收)清理,即对应entry中的value置为null,将指向这个entry的table[i]置为null,直到扫到空entry。
  • 另外,在过程中还会对非空的entry作rehash。
  • 可以说这个函数的作用就是从staleSlot开始清理连续段中的slot(断开强引用,rehash slot等)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
private int expungeStaleEntry(int staleSlot) {
Entry[] tab = table;
int len = tab.length;

// 因为entry对应的ThreadLocal已经被回收,value设为null,显式断开强引用
tab[staleSlot].value = null;
// 显式设置该entry为null,以便垃圾回收
tab[staleSlot] = null;
size--;

// Rehash until we encounter null
Entry e;
int i;
for (i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();
// 清理对应的ThreadLocal已经回收的entry
if (k == null) {
e.value = null;
tab[i] = null;
size--;
} else {
/**
* 对于还没有被回收的情况,需要做一次rehash
* 如果对应的ThreadLocal的ID对len取模出来的索引h不为当前位置i
* 则从h向后线性探测到第一个空的slot,把当前的entry给挪过去。
*/
int h = k.threadLocalHashCode & (len - 1);
if (h != i) {
tab[i] = null;

/*
* 在原代码的这里有句注释值得一提,原注释如下:
*
* Unlike Knuth 6.4 Algorithm R, we must scan until
* null because multiple entries could have been stale.
*
* 这段话提及了Knuth高德纳的著作TAOCP(《计算机程序设计艺术》)的6.4章节(散列)
* 中的R算法。R算法描述了如何从使用线性探测的散列表中删除一个元素。
* R算法维护了一个上次删除元素的index,当在非空连续段中扫到某个entry的哈希值取模后的索引
* 还没有遍历到时,会将该entry挪到index那个位置,并更新当前位置为新的index,
* 继续向后扫描直到遇到空的entry。
*
* ThreadLocalMap因为使用了弱引用,所以其实每个slot的状态有三种也即
* 有效(value未回收),无效(value已回收),空(entry==null)。
* 正是因为ThreadLocalMap的entry有三种状态,所以不能完全套高德纳原书的R算法。
*
* 因为expungeStaleEntry函数在扫描过程中还会对无效slot清理将之转为空slot,
* 如果直接套用R算法,可能会出现具有相同哈希值的entry之间断开(中间有空entry)。
*/
while (tab[h] != null)
h = nextIndex(h, len);
tab[h] = e;
}
}
}
// 返回stateSlot之后第一个空的slot索引
return i;
}

我们来回顾一下从ThreadLocal读一个值可能遇到的情况:

根据入参threadLocal的threadLocalHashCode对表容量取模得到index

  • 如果index对应的slot就是要读的threadLocal,则直接返回结果
  • 调用getEntryAfterMiss线性探测,过程中每碰到无效slot,调用expungeStaleEntry进行段清理;如果找到了key,则返回结果entry
  • 没有找到key,返回null
set()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void set(ThreadLocal<?> key, Object value) {

// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.

Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1);

// 线性探测
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
// 找到对应的entry
if (k == key) {
e.value = value;
return;
}
// 替换失效的entry
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}

tab[i] = new Entry(key, value);
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}
replaceStaleEntry()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;

// 向前扫描,查找最前的一个无效slot
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
slotToExpunge = i;

// 向后遍历table
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();

// 找到了key, 将其与无效的slot交换
if (k == key) {
// 更新对应slot的value值
e.value = value;

tab[i] = tab[staleSlot];
tab[staleSlot] = e;

/*
* 如果在整个扫描过程中(包括函数一开始的向前扫描与i之前的向后扫描)
* 找到了之前的无效slot则以那个位置作为清理的起点
* 否则则以当前的i作为清理起点
*/
if (slotToExpunge == staleSlot)
slotToExpunge = i;
// 从slotToExpunge开始做一次连续段的清理,再做一次启发式清理
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}

// 如果当前的slot已经无效,并且向前扫描过程中没有无效slot,则更新slotToExpunge为当前位置
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}

// 如果key在table中不存在,则在原地放一个即可
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);

// 在探测过程中如果发现任何无效slot,则做一次清理(连续段清理 + 启发式清理)
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}
cleanSomeSlots() —— 启发式地清理slot
  • i对应entry是非无效(指向的ThreadLocal没被回收,或者entry本身为空)
  • n是用于控制控制扫描次数的
  • 正常情况下如果log n次扫描没有发现无效slot,函数就结束了
  • 但是如果发现了无效的slot,将n置为table的长度len,做一次连续段的清理
  • 再从下一个空的slot开始继续扫描
  • 这个函数有两处地方会被调用,一处是插入的时候可能会被调用,另外个是在替换无效slot的时候可能会被调用,
  • 区别是前者传入的n为元素个数,后者为table的容量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private boolean cleanSomeSlots(int i, int n) {
boolean removed = false;
Entry[] tab = table;
int len = tab.length;
do {
// i在任何情况下自己都不会是一个无效slot,所以从下一个开始判断
i = nextIndex(i, len);
Entry e = tab[i];
if (e != null && e.get() == null) {
// 扩大扫描控制因子
n = len;
removed = true;
// 清理一个连续段
i = expungeStaleEntry(i);
}
} while ( (n >>>= 1) != 0);
return removed;
}
rehash()
1
2
3
4
5
6
7
8
9
10
11
12
private void rehash() {
// 做一次全量清理
expungeStaleEntries();

/*
* 因为做了一次清理,所以size很可能会变小。
* ThreadLocalMap这里的实现是调低阈值来判断是否需要扩容,
* threshold默认为len*2/3,所以这里的threshold - threshold / 4相当于len/2
*/
if (size >= threshold - threshold / 4)
resize();
}
expungeStaleEntries()
1
2
3
4
5
6
7
8
9
10
11
12
13
private void expungeStaleEntries() {
Entry[] tab = table;
int len = tab.length;
for (int j = 0; j < len; j++) {
Entry e = tab[j];
if (e != null && e.get() == null)
/*
* 个人觉得这里可以取返回值,如果大于j的话取了用,这样也是可行的。
* 因为expungeStaleEntry执行过程中是把连续段内所有无效slot都清理了一遍了。
*/
expungeStaleEntry(j);
}
}
resize() —— 扩容
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private void resize() {
Entry[] oldTab = table;
int oldLen = oldTab.length;
int newLen = oldLen * 2;
Entry[] newTab = new Entry[newLen];
int count = 0;

for (int j = 0; j < oldLen; ++j) {
Entry e = oldTab[j];
if (e != null) {
ThreadLocal<?> k = e.get();
if (k == null) {
e.value = null; // Help the GC
} else {
// 线性探测来存放Entry
int h = k.threadLocalHashCode & (newLen - 1);
while (newTab[h] != null)
h = nextIndex(h, newLen);
newTab[h] = e;
count++;
}
}
}

setThreshold(newLen);
size = count;
table = newTab;
}

我们来回顾一下ThreadLocal的set方法可能会有的情况

  • 探测过程中slot都不无效,并且顺利找到key所在的slot,直接替换即可
  • 探测过程中发现有无效slot,调用replaceStaleEntry,效果是最终一定会把key和value放在这个slot,并且会尽可能清理无效slot
    • 在replaceStaleEntry过程中,如果找到了key,则做一个swap把它放到那个无效slot中,value置为新值
    • 在replaceStaleEntry过程中,没有找到key,直接在无效slot原地放entry
  • 探测没有发现key,则在连续段末尾的后一个空位置放上entry,这也是线性探测法的一部分。放完后,做一次启发式清理,如果没清理出去key,并且当前table大小已经超过阈值了,则做一次rehash,rehash函数会调用一次全量清理slot方法也即expungeStaleEntries,如果完了之后table大小超过了threshold - threshold / 4,则进行扩容2倍

可以发现,set和get如果冲突严重的话,效率很低,因为ThreadLocalMap是Thread的一个属性,所以即使在自己代码中设置的元素个数,打还是不能控制其他代码的行为。

remove()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 从map中删除ThreadLocal
*/
private void remove(ThreadLocal<?> key) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len - 1);
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
if (e.get() == key) {
// 显式断开弱引用
e.clear();
// 进行段清理
expungeStaleEntry(i);
return;
}
}
}

remove方法相对于getEntry和set方法比较简单,直接在table中找key,如果找到了,把弱引用断了做一次段清理。

ThreadLocal与内存泄漏

关于ThreadLocal是否会引起内存泄漏也是一个比较有争议性的问题,其实就是要看对内存泄漏的准确定义是什么。
认为ThreadLocal会引起内存泄漏的说法是因为如果一个ThreadLocal对象被回收了,我们往里面放的value对于【当前线程->当前线程的threadLocals(ThreadLocal.ThreadLocalMap对象)->Entry数组->某个entry.value】这样一条强引用链是可达的,因此value不会被回收。
认为ThreadLocal不会引起内存泄漏的说法是因为ThreadLocal.ThreadLocalMap源码实现中自带一套自我清理的机制。

之所以有关于内存泄露的讨论是因为在有线程复用如线程池的场景中,一个线程的寿命很长,大对象长期不被回收影响系统运行效率与安全。如果线程不会复用,用完即销毁了也不会有ThreadLocal引发内存泄露的问题。《Effective Java》一书中的第6条对这种内存泄露称为unintentional object retention(无意识的对象保留)。

当我们仔细读过ThreadLocalMap的源码,我们可以推断,如果在使用的ThreadLocal的过程中,显式地进行remove是个很好的编码习惯,这样是不会引起内存泄漏。
那么如果没有显式地进行remove呢?只能说如果对应线程之后调用ThreadLocal的get和set方法都有很高的概率会顺便清理掉无效对象,断开value强引用,从而大对象被收集器回收。

如果使用ThreadLocal的set方法之后,没有显示的调用remove方法,就有可能发生内存泄露,所以养成良好的编程习惯十分重要,使用完ThreadLocal之后,记得调用remove方法。

1
2
3
4
5
6
7
ThreadLocal<String> localName = new ThreadLocal();
try {
localName.set("Test");
// 其它业务逻辑
} finally {
localName.remove();
}

但无论如何,我们应该考虑到何时调用ThreadLocal的remove方法。一个比较熟悉的场景就是对于一个请求一个线程的server如tomcat,在代码中对web api作一个切面,存放一些如用户名等用户信息,在连接点方法结束后,再显式调用remove。