• java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

  • java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

ConcurrentHashMap

目录

  • 目录
  • 集合框架的线程安全
  • ConcurrentHashMap
    • jdk7ConcurrentHashMap
    • jdk8ConcurrentHashMap:
      • 重要的属性
      • 类
      • 核心方法
      • initTable()
      • transfer()
      • putval()
      • addcount()
      • helptransfer()

集合框架的线程安全

Java 提供了不同层面的线程安全支持。在传统集合框架内部,除了 Hashtable 等同步容器,还提供了所谓的同步包装器(Synchronized Wrapper),我们可以调用 Collections 工具类提供的包装方法,来获取一个同步的包装容器(如 Collections.synchronizedMap),但是它们都是利用非常粗粒度的同步方式,在高并发情况下,性能比较低下。

线程安全容器类:

  1. 并发容器: ConcurrentHashMap CopyOnWriteArrayList
  2. 线程安全队列: ArrayBlockingQueue SynchronousQueue
  3. 各种有序容器的线程安全版本.
  • 理解基本线程安全工具
  • 理解传统集合框架并发中Map存在的问题.
  • 梳理并发包内,ConcurrentHashMap采用了那些方法提高并发表现.
  • 掌握ConcurrentHashMap自身的演进.

Unsafe类并不建议使用,其类似C中指针,直接操作内存.

ConcurrentHashMap

HashTable本身低效,将各种方法用Synchronized.所有并发操作都竞争同一把锁.

jdk7ConcurrentHashMap

  • 分离锁: 将内部进行分段(Segment),里面则是HashEntry的数组,同HashMap类似.hash相同的条目也是以链表形式存放.
  • HashEntry 内部调用volatile的value保证可见性.利用了不可边对象机制改进Unsafe提供的底层能力.如volatile access,去直接完成部分操作,以最优化性能,Unsafe中的很多操作都是JVM intrinsic 优化过的.

jdk7

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// get操作保证可见性,没有同步逻辑
public V get(Object key) {
Segment<K,V> s; // manually integrate access methods to reduce overhead
HashEntry<K,V>[] tab;
int h = hash(key.hashCode());
// 利用位操作替换普通数学运算
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 以 Segment 为单位,进行定位
// 利用 Unsafe 直接进行 volatile access
if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
(tab = s.table) != null) {
// 省略
}
return null;
}
public V put(K key, V value) {
Segment<K,V> s;
if (value == null)
throw new NullPointerException();
// 二次哈希,以保证数据的分散性,避免哈希冲突
int hash = hash(key.hashCode());
int j = (hash >>> segmentShift) & segmentMask;
if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck
(segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
s = ensureSegment(j);
return s.put(key, hash, value, false);
}


final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// scanAndLockForPut 会去查找是否有 key 相同 Node
// 无论如何,确保获取锁
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K,V>[] tab = table;
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
if (e != null) {
K k;
// 更新已有 value...
}
else {
// 放置 HashEntry 到特定位置,如果超过阈值,进行 rehash
// ...
}
}
} finally {
unlock();
}
return oldValue;
}

在并发写操作时

  • concurentHashMap 获取再入锁,保证数据一致性,Segment本身基于ReentrantLock扩展,所以,在并发修改期间,相应Segment被锁定.
  • ConcurrentHashMap 的扩容是对Segment进行扩容.
  • 最初阶段,进行重复性扫描,确保Key已存在数组里面.进而决定更新还是放置操作.重复扫描,检测冲突是ConcurrentHashMap常见技巧.
  • size方法通过重试机制(RETRIES_BEFORE_LOCK,指定重试次数2),获取可靠值.如果监控到变化(对比Segment.modCount),就直接返回,否则取锁进行操作.

jdk8ConcurrentHashMap:

  • 总体结构类似HashMap,同步粒度更细致一些
  • 内部仍然使用Segment,但是仅仅为了保证序列化的兼容性,不再有任何结构上的用处
  • 不再使用Segment,初始化操作简化,改为lazy-load形式.避免初始化开销
  • 数据存储利用volatile保证可见性.
  • 使用CAS等操作,在特定场景进行无锁并发.
  • 使用Unsafe,LongAdder之类底层手段,进行极端情况的优化.
  • 在ConcurrentHashMap中,随处可以看到U, 大量使用了U.compareAndSwapXXX的方法,这个方法是利用一个CAS算法实现无锁化的修改值的操作,他可以大大降低锁代理的性能消耗。这个算法的基本思想就是不断地去比较当前内存中的变量值与你指定的一个变量值是否相等,如果相等,则接受你指定的修改的值,否则拒绝你的操作。因为当前线程中的值已经不是最新的值,你的修改很可能会覆盖掉其他线程修改的结果。这一点与乐观锁,SVN的思想是比较类似的。

重要的属性

SizeCtl

  1. 负数代表正在初始化或者扩容
  2. -1 初始化
  3. -N表示有N-1个线程在扩容.
  4. >=0 hash表未被初始化.表示初始化或者下一次扩容的大小.值始终是当前ConcurrentHashMap容量的0.75倍,这与loadfactor是对应的。

int MOVED = -1;

static final, hash for forwarding nodes

int TREEBIN = -2;

static final, hash for roots of trees

int RESERVED = -3;

static final, hash for transient reservations

int HASH_BITS = 0x7fffffff;

usable bits of normal node hash

类

  1. Node
    包装了Key-Value键值对,Value 和 next属性设置了volatile。不允许setValue直接设置Node的value域(throw unsupportedOperationException),增加了find方法辅助map.get()

  2. TreeNode
    树节点类,另外一个核心的数据结构。当链表长度过长的时候,会转换为TreeNode。但是与HashMap不相同的是,它并不是直接转换为红黑树,而是把这些结点包装成TreeNode放在TreeBin对象中,由TreeBin完成对红黑树的包装。而且TreeNode在ConcurrentHashMap集成自Node类,而并非HashMap中的集成自LinkedHashMap.Entry<K,V>类,也就是说TreeNode带有next指针,这样做的目的是方便基于TreeBin的访问。

  3. TreeBin
    这个类并不负责包装用户的key、value信息,而是包装的很多TreeNode节点。它代替了TreeNode的根节点,也就是说在实际的ConcurrentHashMap“数组”中,存放的是TreeBin对象,而不是TreeNode对象,这是与HashMap的区别。另外这个类还带有了读写锁。hash = -2

  4. ForwardingNode
    一个用于连接两个table的节点类。它包含一个nextTable指针,用于指向下一张表。而且这个节点的key value next指针全部为null,它的hash值为-1. 这里面定义的find的方法是从nextTable里进行查询节点,而不是以自身为头节点进行查找。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    /**
    * A node inserted at head of bins during transfer operations.
    */
    static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
    super(MOVED, null, null, null);
    this.nextTable = tab;
    }

    Node<K,V> find(int h, Object k) {
    // loop to avoid arbitrarily deep recursion on forwarding nodes
    outer: for (Node<K,V>[] tab = nextTable;;) {
    Node<K,V> e; int n;
    if (k == null || tab == null || (n = tab.length) == 0 ||
    (e = tabAt(tab, (n - 1) & h)) == null)
    return null;
    for (;;) {
    int eh; K ek;
    if ((eh = e.hash) == h &&
    ((ek = e.key) == k || (ek != null && k.equals(ek))))
    return e;
    if (eh < 0) {
    if (e instanceof ForwardingNode) {
    tab = ((ForwardingNode<K,V>)e).nextTable;
    continue outer;
    }
    else
    return e.find(h, k);
    }
    if ((e = e.next) == null)
    return null;
    }
    }
    }
    }

    核心方法

    ConcurrentHashMap定义了三个原子操作,用于对指定位置的节点进行操作。正是这些原子操作保证了ConcurrentHashMap的线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
//获得在i位置上的Node节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//利用CAS算法设置i位置上的Node节点。之所以能实现并发是因为他指定了原来这个节点的值是多少
//在CAS算法中,会比较内存中的值与你指定的这个值是否相等,如果相等才接受你的修改,否则拒绝你的修改
//因此当前线程中的值并不是最新的值,这种修改可能会覆盖掉其他线程的修改结果
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//利用volatile方法设置节点位置的值
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

initTable()

初始化操作实现在 initTable 里面,这是一个典型的 CAS 使用场景,利用volatile的 sizeCtl 作为互斥手段:如果发现竞争性的初始化,就 spin 在那里,等待条件恢复;否则利用 CAS 设置排他标志。如果成功则进行初始化;否则重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35

/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
//sizeCtl表示有其他线程正在进行初始化操作,把线程挂起。
//对于table的初始化工作,只能有一个线程在进行。
// sizeCtl<0 时,有其他线程在更改table
if ((sc = sizeCtl) < 0)
// 自旋
Thread.yield(); // lost initialization race; just spin
// cas 设置 sc 为 -1 ,表示本线程正在进行初始化
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
// table 为空
if ((tab = table) == null || tab.length == 0) {
// sc未自定义,则设置默认值 DEFAULT_CAPACITY = 16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
//创建数组
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//sizeCtl 计算后作为扩容的阀值,相当于0.75*n
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}

transfer()

ConcurrentHashMap扩容时,支持多线程扩容(没有加锁),利用并发减少扩容的影响(扩容总会复制数组)

  • 扩容的步骤
  1. 每个线程处理区间,默认16
  2. 初始化临时nextTable,扩容2倍
  3. 死循环,计算下标
  4. 如果有数据,同步转移。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
/**
* The next table index (plus one) to split while resizing.
*/
private transient volatile int transferIndex;

/**
* Moves and/or copies the nodes in each bin to new table. See
* above for explanation.
*/
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {

int n = tab.length, stride;
// 将 length/8 ,然后除以 CPU核心数。如果得到的结果小于 16,那么就使用 16。
// 这里的目的是让每个 CPU 处理的桶一样多,避免出现转移任务不均匀的现象
// 如果桶较少的话,默认一个 CPU(一个线程)处理 16 个桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) // MIN_TRANSFER_STRIDE = 16
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
// nextTab扩容2倍
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
// 扩容失败,sizeCtl设置为Integer最大值。
sizeCtl = Integer.MAX_VALUE;
return;
}
// 更新变量
nextTable = nextTab;
// 更新转移下标,就是 老的 tab 的 length
transferIndex = n;
}
// 新tab的length
int nextn = nextTab.length;
//构造一个连节点指针 用于标志位,当别的线程发现这个槽位中是 fwd 类型的节点,则跳过这个节点。
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);

// 首次推进为 true,如果等于 true,说明这个节点已经处理过,需要再次推进一个下标(i--)
// 反之,如果是 false,那么就不能推进下标,需要将当前的下标处理完毕才能继续推进
boolean advance = true;
// to ensure sweep before committing nextTab
// 完成状态标识
boolean finishing = false;

// 这个while循环体的作用就是在控制i-- ,从大到小处理,i就是数组下标()
// 通过i--可以依次遍历原hash表中的节点
// 死循环开始转移。多线程并发转移就是在这个死循环中,根据 finishing 变量来判断,该变量为 true 表示扩容结束,否则继续扩容。
// i 参数,当前区间处理的最大下标
// bound 参数,这个参数指的是该线程此次可以处理的区间的最小下标,超过这个下标,就需要重新领取区间或者结束扩容
// 死循环
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 如果可以向后遍历,控制i递减
// 同时每个线程都会进入这里取得自己需要转移的桶的区间
while (advance) {
// 对 i 减一,判断是否大于等于 bound (正常情况下,如果大于 bound 不成立,说明该线程上次领取的任务已经完成了。那么,需要在下面继续领取任务)
// 如果对 i 减一大于等于 bound(还需要继续做任务),或者完成了,修改推进状态为 false,不能推进了。任务成功后修改推进状态为 true。
// 通常,第一次进入循环,i-- 这个判断会无法通过,从而走下面的 nextIndex 赋值操作(获取最新的转移下标)。
// 其余情况都是:如果可以推进,将 i 减一,然后修改成不可推进。
// 如果 i 对应的桶处理成功了,改成可以推进。
int nextIndex, nextBound;
// 对i-1,
if (--i >= bound || finishing)
// 这里设置 false,是为了防止在没有成功处理一个桶的情况下却进行了推进
advance = false;
// 这里的目的是:
// 1. 当一个线程进入时,会选取最新的转移下标。
// 2. 当一个线程处理完自己的区间时,如果还有剩余区间的没有别的线程处理。再次获取区间。
// 如果小于等于0,说明没有区间了 ,i 改成 -1
// 推进状态变成 false,不再推进,表示,扩容结束了,当前线程可以退出了
else if ((nextIndex = transferIndex) <= 0) {
// 这个 -1 会在下面的 if 块里判断,从而进入完成状态判断
i = -1;
advance = false;
}
// CAS 修改 transferIndex,即 length - 区间值,留下剩余的区间值供后面的线程使用
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
// 这个值就是当前线程可以处理的当前区间最小下标
bound = nextBound;
// 初次对i 赋值,这个就是当前线程可以处理的当前区间的最大下标
i = nextIndex - 1;
// 这里设置 false,退出死循环,是为了防止在没有成功处理一个桶的情况下却进行了推进,这样对导致漏掉某个桶。
// 下面的 if (tabAt(tab, i) == f) 判断会出现这样的情况。
advance = false;
}
}
// 如果 i 小于0 (不在 tab 下标内,按照上面的判断,领取最后一段区间的线程扩容结束)
// 如果 i >= tab.length
// 如果 i + tab.length >= nextTable.length
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//如果所有的节点都已经完成复制工作
//就把nextTable赋值给table 清空临时对象nextTable
if (finishing) {
// 置空变量
nextTable = null;
// 更新table
table = nextTab;
// 更新阈值
sizeCtl = (n << 1) - (n >>> 1);
return;
}
//利用CAS方法更新这个扩容阈值
// 尝试将 sc -1. 表示这个线程结束帮助扩容了,将 sc 的低 16 位减一。
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 如果 sc - 2 不等于标识符左移 16 位。
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
// 不相等,说明没结束,当前线程结束方法。
return;
// 如果他们相等了,说明没有线程在帮助他们扩容了。也就是说,扩容结束了。
finishing = advance = true;
// 重新检查整张表
i = n;
}
}
//如果遍历到的节点为空 则用fwd 占位,标记此节点已经处理.
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
//如果遍历到ForwardingNode节点
//说明这个点已经被处理过了 继续推进 这里是控制并发扩容的核心
// 如果不是 null 且 hash 值是 MOVED。
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// 到这里,说明这个位置有实际值了,且不是占位符。对这个节点上锁。为什么上锁,防止 putVal 的时候向链表插入数据
else {
//节点上锁
synchronized (f) {
// 判断 i 下标处的桶节点是否和 f 相同
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn; // low Node, height Node 低位节点,高位节点
// 如果fh>=0 证明这是一个node节点

if (fh >= 0) {
// 由于 Map 的长度都是 2 的次方(000001000 这类的数字),那么取于 length 只有 2 种结果,一种是 0,一种是1
// 如果是结果是0 ,其放在低位,反之放在高位,将链表重新 hash,放到对应的位置上,让新的取余算法能够命中
int runBit = fh & n;
Node<K,V> lastRun = f;
// 遍历桶
for (Node<K,V> p = f.next; p != null; p = p.next) {
// 取余桶的hash
int b = p.hash & n;
// 如果节点的 hash 值和首节点的 hash 值取于结果不同
if (b != runBit) {
// 更新 runBit,用于下面判断 lastRun 该赋值给 ln 还是 hn。
runBit = b;
// 这个 lastRun 保证后面的节点与自己的取于值相同,避免后面没有必要的循环
lastRun = p;
}
}
// 如果最后更新的 runBit 是 0 ,设置低位节点
if (runBit == 0) {
ln = lastRun;
hn = null;
}
//// 如果最后更新的 runBit 是 1, 设置高位节点
else {
hn = lastRun;
ln = null;
}
// 再次循环,生成两个链表,lastRun 作为停止条件,这样就是避免无谓的循环(lastRun 后面都是相同的取于结果)
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
// 如果与运算结果是 0,那么就还在低位
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
// 1 则创建高位
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 设置低位链表放在新链表的 i
setTabAt(nextTab, i, ln);
// 设置高位链表,在原有长度上加 n
setTabAt(nextTab, i + n, hn);
// 将旧的链表设置成占位符
setTabAt(tab, i, fwd);
// 继续向后推进
advance = true;
}
// TreeBin 的 hash 是 -2,是 treeBin,且hash < 0
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// 遍历
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 与运算 == 0 的放在低位
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
// 不是 0 的放在高位
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 低位树
setTabAt(nextTab, i, ln);
// 高位树
setTabAt(nextTab, i + n, hn);
// 旧的设置成占位符
setTabAt(tab, i, fwd);
// 继续向后推进
advance = true;
}
}
}
}
}
}

putval()

总体流程:
死循环:

  1. 为空 初始化,
  2. 不为空,下标节点为空,cas插入,退出。
  3. hash冲突,目标hash == -1,为占位fwd,则正在扩容。调用 helpTransfer()
  4. hash冲突,hash != -1,锁住目标节点,且下标节点没有变化
    1. 如果hash>0(红黑树的hash是 -2),记录链表长度,判断key是否相同,复写oldValue或者插入末尾
    2. 如果为红黑树,记录链表长度,插入节点或者复写
    3. 判断长度(是否大于8),选择树化或退化链表
  5. 将数组加1(CAS方式),如果需要扩容,则调用 transfer 方法进行移动和重新散列,该方法中,如果是槽中只有单个节点,则使用CAS直接插入,如果不是,则使用 synchronized 进行同步,防止并发成环。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
 static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
// …
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
// 死循环
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh; K fk; V fv;
// 为空 则初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 获取下标节点,如果空, cas 插入
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value)))
break;
}
// 槽位有node,且 hash 值为 -1(MOVED)
// 说明是 ForwardingNode 对象(这是一个占位符对象,表示正在扩容--在transfer()中设置)
// 帮助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
// 如果 hash 冲突了,且 hash 值不为 -1
else {
V oldVal = null;
// 锁住 f 节点,防止增加链表的时候导致链表成环
synchronized (f) {
// 如果对应的下标位置 的节点没有改变
if (tabAt(tab, i) == f) {
//f的hash值大于等0.
if (fh >= 0) {
//链表初始长度
binCount = 1;
//死循环,将值添加到链表尾部.计算链表长度
for (Node<K,V> e = f;; ++binCount) {
K ek;
// key 重复,判断onlyIfAbsent,value覆盖.
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key, value, null);
break;
}
}
}
// 如果f是树类型,且f.hash<0
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
// 链表长度大于等于8时,将该节点改成红黑树树
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 对hash表进行计数
// 从 putVal 传入的参数是 1,
// binCount,binCount 默认是0,只有 hash 冲突了才会大于 1.
// bincount的大小是链表的长度(如果不是红黑数结构的话)。
addCount(1L, binCount);
return null;
}

addcount()

功能:

  1. 对 table 的长度加一。无论是通过修改 baseCount,还是通过使用 CounterCell。当 CounterCell 被初始化了,就优先使用他,不再使用 baseCount。
  2. 检查是否需要扩容,或者是否正在扩容。如果需要扩容,就调用扩容方法,如果正在扩容,就帮助其扩容。

总体流程:

  1. 判断计数盒子属性是否是空,如果是空,就尝试修改 baseCount 变量,对该变量进行加 X。
  2. 如果计数盒子不是空,或者修改 baseCount 变量失败了,则放弃对 baseCount 进行操作。
  3. 如果计数盒子是 null 或者计数盒子的 length 是 0,或者随机取一个位置取于数组长度是 null,那么就对刚刚的元素进行 CAS 赋值。
  4. 如果赋值失败,或者满足上面的条件,则调用 fullAddCount 方法重新死循环插入。
  5. 这里如果操作 baseCount 失败了(或者计数盒子不是 Null),且对计数盒子赋值成功,那么就检查 check 变量,如果该变量小于等于 1. 直接结束。否则,计算一下 count 变量。
  6. 如果 check 大于等于 0 ,说明需要对是否扩容进行检查。
  7. 如果 map 的 size 大于 sizeCtl(扩容阈值),且 table 的长度小于 1 << 30,那么就进行扩容。
  8. 根据 length 得到一个标识符,然后,判断 sizeCtl 状态,如果小于 0 ,说明要么在初始化,要么在扩容。
  9. 如果正在扩容,那么就校验一下数据是否变化了(具体可以看上面代码的注释)。如果检验数据不通过,break。
  10. 如果校验数据通过了,那么将 sizeCtl 加一,表示多了一个线程帮助扩容。然后进行扩容。
  11. 如果没有在扩容,但是需要扩容。那么就将 sizeCtl 更新,赋值为标识符左移 16 位 —— 一个负数。然后加 2。 表示,已经有一个线程开始扩容了。然后进行扩容。然后再次更新 count,看看是否还需要扩容。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82


/**
* Base counter value, used mainly when there is no contention,
* but also as a fallback during table initialization
* races. Updated via CAS.
*/
private transient volatile long baseCount;
// Unsafe mechanics
private static final sun.misc.Unsafe U;
private static final long SIZECTL;
private static final long TRANSFERINDEX;
private static final long BASECOUNT;
private static final long CELLSBUSY;
private static final long CELLVALUE;
private static final long ABASE;
private static final int ASHIFT;
/**
* @param x 此次需要对表中元素个数加 X
* @param check <0 不需要检查,大于等于0 需要检查,putval中 binCount最低为0,所以putval每次都要检查。
*/
private final void addCount(long x, int check) {
// 计数器
CounterCell[] as; long b, s;
// 计数器不为空
// 修改baseCount 失败。
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 如果计数盒子是空(尚未出现并发)
// 如果随机取余一个数组位置为空 或者
// 修改这个槽位的变量失败(出现并发了)
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
// 执行 fullAddCount 方法。并结束
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 如果map.size() 大于 sizeCtl(达到扩容阈值需要扩容)
// 且table 不是空;且 table 的长度小于 MAXIMUM_CAPACITY(1 << 30)。(可以扩容)
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 根据 length 得到一个标识
int rs = resizeStamp(n);// return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
if (sc < 0) {
// 如果 sc 的低 16 位不等于 标识符(校验异常 sizeCtl 变化了)
// 如果 sc == 标识符 + 1 (扩容结束了,不再有线程进行扩容)
// (默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
// 如果 sc == 标识符 + 65535(帮助线程数已经达到最大)
// 如果 nextTable == null(结束扩容了)
// 如果 transferIndex <= 0 (转移状态变化了)
// 结束循环
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 如果可以帮助扩容,那么将 sc 加 1. 表示多了一个线程在帮助扩容
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
// 如果不再扩容,将 sc 标识符左移 16 位 然后 + 2,变成一个负数。
// 高 16 位是标识符,低 16 位初始是 2.
// 第一次调用扩容方法前,sizeCtl 的低 16 位是加 2 的,不是加一。
// 所以 sc == rs + 1 的判断是表示是否完成任务了。
// 因为完成扩容后,sizeCtl == rs + 1。
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 更新sc为负数,然后扩容。
transfer(tab, null);
s = sumCount();
}
}
}

helptransfer()

功能:
在 putVal 方法中,如果发现线程当前 hash 冲突了,也就是当前 hash 值对应的槽位有值了,且如果这个值是 -1 (MOVED),说明 Map 正在扩容。
那么就帮助 Map 进行扩容。以加快速度。

总体流程:

  1. 判 tab 空,判断是否是转移节点。判断 nextTable 是否更改了。
  2. 更加 length 得到标识符。
  3. 判断是否并发修改了,判断是否还在扩容。
  4. 如果还在扩容,判断标识符是否变化,判断扩容是否结束,判断是否达到最大线程数,判断扩容转移下标是否在调整(扩容结束),如果满足任意条件,结束循环。
  5. 如果不满足,并发转移。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// 为了移动节点准备的常量.
static final int MOVED = -1;

ForwardingNode(Node<K,V>[] tab) {
super(MOVED, null, null, null);
this.nextTable = tab;
}

/**
* Helps transfer if a resize is in progress.
*/
//传入的参数是:成员变量 table 和 对应槽位的 f 变量。
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
// table 不为空,node是fwd转移节点,且node节点的nextTable(新table) 非空
//帮助扩容开始
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//根据length获得标识符
int rs = resizeStamp(tab.length);
//nextTab 没有并发修改,tab也没有并发修改
// 且 sizeCtl<0 在扩容中
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
// sizeCtl 该变量高 16 位保存 length 生成的标识符,低 16 位保存并发扩容的线程数,通过这连个数字,可以判断出,是否结束扩容了。
// 如果 sizeCtl 无符号右移 16 不等于 rs ( sc前 16 位如果不等于标识符,则标识符变化了)
// 或者 sizeCtl == rs + 1 (扩容结束了,不再有线程进行扩容)
// (默认第一个线程设置 sc ==rs 左移 16 位 + 2,当第一个线程结束扩容了,就会将 sc 减一。这个时候,sc 就等于 rs + 1)
// 或者 sizeCtl == rs + 65535 (如果达到最大帮助线程的数量,即 65535)
// 或者转移下标正在调整 (扩容结束)
// 结束循环,返回 table
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
// 如果以上都不是, 将 sizeCtl + 1, (表示增加了一个线程帮助其扩容)
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
// 进行转移(扩容)
transfer(tab, nextTab);
// 结束
break;
}
}
return nextTab;
}
return table;
}

aop流程
ioc流程
  1. 1. 目录
  2. 2. 集合框架的线程安全
  3. 3. ConcurrentHashMap
    1. 3.1. jdk7ConcurrentHashMap
    2. 3.2. jdk8ConcurrentHashMap:
      1. 3.2.1. 重要的属性
      2. 3.2.2. 类
      3. 3.2.3. 核心方法
      4. 3.2.4. initTable()
      5. 3.2.5. transfer()
      6. 3.2.6. putval()
      7. 3.2.7. addcount()
      8. 3.2.8. helptransfer()
© 2023 haoxp
Hexo theme