目录
集合框架的线程安全
Java 提供了不同层面的线程安全支持。在传统集合框架内部,除了 Hashtable 等同步容器,还提供了所谓的同步包装器(Synchronized Wrapper),我们可以调用 Collections 工具类提供的包装方法,来获取一个同步的包装容器(如 Collections.synchronizedMap),但是它们都是利用非常粗粒度的同步方式,在高并发情况下,性能比较低下。
线程安全容器类:
- 并发容器: ConcurrentHashMap CopyOnWriteArrayList
- 线程安全队列: ArrayBlockingQueue SynchronousQueue
- 各种有序容器的线程安全版本.
- 理解基本线程安全工具
- 理解传统集合框架并发中Map存在的问题.
- 梳理并发包内,ConcurrentHashMap采用了那些方法提高并发表现.
- 掌握ConcurrentHashMap自身的演进.
Unsafe类并不建议使用,其类似C中指针,直接操作内存.
ConcurrentHashMap
HashTable
本身低效,将各种方法用Synchronized
.所有并发操作都竞争同一把锁.
jdk7ConcurrentHashMap
- 分离锁: 将内部进行分段(Segment),里面则是HashEntry的数组,同HashMap类似.hash相同的条目也是以链表形式存放.
- HashEntry 内部调用volatile的value保证可见性.利用了不可边对象机制改进Unsafe提供的底层能力.如volatile access,去直接完成部分操作,以最优化性能,Unsafe中的很多操作都是JVM intrinsic 优化过的.
jdk7
1 | // get操作保证可见性,没有同步逻辑 |
在并发写操作时
- concurentHashMap 获取
再入锁
,保证数据一致性,Segment本身基于ReentrantLock扩展,所以,在并发修改期间,相应Segment被锁定. - ConcurrentHashMap 的扩容是对Segment进行扩容.
- 最初阶段,进行重复性扫描,确保Key已存在数组里面.进而决定更新还是放置操作.重复扫描,检测冲突是ConcurrentHashMap常见技巧.
- size方法通过重试机制(RETRIES_BEFORE_LOCK,指定重试次数2),获取可靠值.如果监控到变化(对比Segment.modCount),就直接返回,否则取锁进行操作.
jdk8ConcurrentHashMap:
- 总体结构类似HashMap,同步粒度更细致一些
- 内部仍然使用Segment,但是仅仅为了保证序列化的兼容性,不再有任何结构上的用处
- 不再使用Segment,初始化操作简化,改为lazy-load形式.避免初始化开销
- 数据存储利用volatile保证可见性.
- 使用CAS等操作,在特定场景进行无锁并发.
- 使用Unsafe,LongAdder之类底层手段,进行极端情况的优化.
- 在ConcurrentHashMap中,随处可以看到U, 大量使用了U.compareAndSwapXXX的方法,这个方法是利用一个CAS算法实现无锁化的修改值的操作,他可以大大降低锁代理的性能消耗。这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。因为当前线程中的值已经不是最新的值,你的修改很可能会覆盖掉其他线程修改的结果。这一点与乐观锁,SVN的思想是比较类似的。
重要的属性
SizeCtl
- 负数代表正在初始化或者扩容
- -1 初始化
- -N表示有N-1个线程在扩容.
- >=0 hash表未被初始化.表示初始化或者下一次扩容的大小.值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。
int MOVED = -1;
static final, hash for forwarding nodes
int TREEBIN = -2;
static final, hash for roots of trees
int RESERVED = -3;
static final, hash for transient reservations
int HASH_BITS = 0x7fffffff;
usable bits of normal node hash
类
Node
包装了Key-Value键值对,Value 和 next属性设置了volatile。不允许setValue直接设置Node的value域(throw unsupportedOperationException),增加了find方法辅助map.get()TreeNode
树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry<K,V>类,也就是说TreeNode带有next指针,这样做的目的是方便基于TreeBin的访问。TreeBin
这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。hash = -2ForwardingNode
一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1. 这里面定义的find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36/**
* A node inserted at head of bins during transfer operations.
*/
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}
}核心方法
ConcurrentHashMap定义了三个原子操作,用于对指定位置的节点进行操作。正是这些原子操作保证了ConcurrentHashMap的线程安全。
1 | //获得在i位置上的Node节点 |
initTable()
初始化操作实现在 initTable 里面,这是一个典型的 CAS 使用场景,利用volatile
的 sizeCtl 作为互斥手段:如果发现竞争性的初始化,就 spin 在那里,等待条件恢复;否则利用 CAS 设置排他标志。如果成功则进行初始化;否则重试。
1 |
|
transfer()
ConcurrentHashMap扩容时,支持多线程扩容(没有加锁),利用并发减少扩容的影响(扩容总会复制数组)
- 扩容的步骤
- 每个线程处理区间,默认16
- 初始化临时nextTable,扩容2倍
- 死循环,计算下标
- 如果有数据,同步转移。
1 | /** |
putval()
总体流程:
死循环:
- 为空 初始化,
- 不为空,下标节点为空,cas插入,退出。
- hash冲突,目标hash == -1,为占位fwd,则正在扩容。调用 helpTransfer()
- hash冲突,hash != -1,锁住目标节点,且下标节点没有变化
- 如果hash>0(红黑树的hash是 -2),记录链表长度,判断key是否相同,复写oldValue或者插入末尾
- 如果为红黑树,记录链表长度,插入节点或者复写
- 判断长度(是否大于8),选择树化或退化链表
- 将数组加1(CAS方式),如果需要扩容,则调用 transfer 方法进行移动和重新散列,该方法中,如果是槽中只有单个节点,则使用CAS直接插入,如果不是,则使用 synchronized 进行同步,防止并发成环。
1 | static class Node<K,V> implements Map.Entry<K,V> { |
addcount()
功能:
- 对 table 的长度加一。无论是通过修改 baseCount,还是通过使用 CounterCell。当 CounterCell 被初始化了,就优先使用他,不再使用 baseCount。
- 检查是否需要扩容,或者是否正在扩容。如果需要扩容,就调用扩容方法,如果正在扩容,就帮助其扩容。
总体流程:
- 判断计数盒子属性是否是空,如果是空,就尝试修改 baseCount 变量,对该变量进行加 X。
- 如果计数盒子不是空,或者修改 baseCount 变量失败了,则放弃对 baseCount 进行操作。
- 如果计数盒子是 null 或者计数盒子的 length 是 0,或者随机取一个位置取于数组长度是 null,那么就对刚刚的元素进行 CAS 赋值。
- 如果赋值失败,或者满足上面的条件,则调用 fullAddCount 方法重新死循环插入。
- 这里如果操作 baseCount 失败了(或者计数盒子不是 Null),且对计数盒子赋值成功,那么就检查 check 变量,如果该变量小于等于 1. 直接结束。否则,计算一下 count 变量。
- 如果 check 大于等于 0 ,说明需要对是否扩容进行检查。
- 如果 map 的 size 大于 sizeCtl(扩容阈值),且 table 的长度小于 1 << 30,那么就进行扩容。
- 根据 length 得到一个标识符,然后,判断 sizeCtl 状态,如果小于 0 ,说明要么在初始化,要么在扩容。
- 如果正在扩容,那么就校验一下数据是否变化了(具体可以看上面代码的注释)。如果检验数据不通过,break。
- 如果校验数据通过了,那么将 sizeCtl 加一,表示多了一个线程帮助扩容。然后进行扩容。
- 如果没有在扩容,但是需要扩容。那么就将 sizeCtl 更新,赋值为标识符左移 16 位 —— 一个负数。然后加 2。 表示,已经有一个线程开始扩容了。然后进行扩容。然后再次更新 count,看看是否还需要扩容。
1 |
|
helptransfer()
功能:
在 putVal 方法中,如果发现线程当前 hash 冲突了,也就是当前 hash 值对应的槽位有值了,且如果这个值是 -1 (MOVED),说明 Map 正在扩容。
那么就帮助 Map 进行扩容。以加快速度。
总体流程:
- 判 tab 空,判断是否是转移节点。判断 nextTable 是否更改了。
- 更加 length 得到标识符。
- 判断是否并发修改了,判断是否还在扩容。
- 如果还在扩容,判断标识符是否变化,判断扩容是否结束,判断是否达到最大线程数,判断扩容转移下标是否在调整(扩容结束),如果满足任意条件,结束循环。
- 如果不满足,并发转移。
1 | // 为了移动节点准备的常量. |