• java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

  • java
  • go
  • 数据库
  • linux
  • 中间件
  • 书
  • 源码
  • 夕拾

【源码阅读】AQS-jdk8

目录

  • 目录
  • 简介
  • 重要的方法
    • 可重写的方法
    • 同步状态的相关方法
  • 源码浅析
    • 简介
    • node
    • 独占式加锁
    • 独占式释放锁

简介

同步队列器(AQS)负责管理同步容器类中的状态,它管理了一个整数状态信息,可以通过getState(),setState()以及compareAndSetState()等方法,进行操作.这个整数可以表示任意状态.

  • reentrantLock用它表示所有线程已经重复获取该锁的次数.
  • Semaphore用它表示剩余许可的数量
  • FutureTask表示任务状态.

AQS的管理同步状态的同步队列

  • 当线程获取同步状态失败时,将当前线程以及等待状态信息构造成一个Node节点,将其加入到同步队列尾部,阻塞该线程.
  • 当同步状态被释放,唤醒同步队列的首节点线程,获取同步状态.

重要的方法

可重写的方法

方法名 描述
protected boolean tryAcquire(int arg) 独占式获取同步状态
protected boolean tryRelease(int arg) 独占式释放同步状态
protected int tryAcquireShared(int arg) 共享式获取同步状态,>=0成功,反之失败
protected boolean tryReleaseShared(int arg) 共享式 释放同步状态
protected boolean isHeldExclusively() 当前同步器是否在独占式下被线程使用,一般该方法表示是否被当前线程所独占

同步状态的相关方法

方法名 描述
getState() 获取当前同步状态
setState() 设置当前同步状态
compareAndSetState(int expect, int update) 使用cas设置同步状态,该方法保证同步状态设置的原子性

源码浅析

简介

AQS,等待队列是一种CLH queue的变体。
CLH 通常用于自旋锁。用来代替使用阻塞同步。

每个节点有个status字段,跟踪现场是否阻塞,当他的前驱节点释放锁,status为SIGNAL。
队列中的每个节点,也作为一个持有单一等待线程的特定监视器

状态字段不控制(线程)是否持有锁

队头元素会尝试获取锁,但是不一定会成功。

进入CLH 锁队列,将原子性的进入队尾,出队,只需要更新头节点,然后确定有效的后继节点。

  • prev主要处理CANCELLED节点
  • next用来实现阻塞机制.前继节点通过遍历可以确定哪个线程(node持有线程id),然后通知节点唤醒.

CLH队列需要一个虚拟的头节点才能开始(懒加载模式创建)

条件队列(condition)使用相同的节点,但是其实现需要一个简单(非并行)的队列,只有在专用时访问,等待将节点插入条件队列.收到信号,将条件队列中的节点转移至主队列.

node

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 双向链表
// 每一个打算获取锁的线程都在AQS里被包装成为一个Node
// 假如取不到锁,就将Node加入至链表,当某一个线程释放锁,
// 就从链表里取一个等待的node,将锁给它
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
// 超时或者中断,将永远处于此状态,也不会再次被阻塞.代码中有 ws>0 判断,即代表该节点已经出于cancelled
static final int CANCELLED = 1;
// 等待释放资源
// 该节点的后继者被阻塞,当前节点释放或取消时,必须unpark后继节点.
// 为了避免冲突,acquire方法必须指示它们需要signal,然后原子性重试获取,在失败时阻塞
static final int SIGNAL = -1;
// 在条件队列中,等待被唤醒
// 不会被当作同步队列节点
// 与其他用途无关,简化机制
static final int CONDITION = -2;
// 线程处于shared才会被使用
// 传播到其他节点,doReleaseShared中对此进行设置(仅适用于头节点),确保传播继续进行.
static final int PROPAGATE = -3;

// waitStatus以数字化排列简化使用,非负表示不需要发信号,因此多数代码不需要检查特定值.
// 对于普通同步节点,初始化为0,对于条件节点,初始化为CONDITION.
// 使用CAS对其进行修改
volatile int waitStatus;
// 双向链表,前驱节点
volatile Node prev;
// 后继节点
volatile Node next;
// 获取同步状态的线程
volatile Thread thread;
// 指向下一个condition状态的节点
Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}

}

独占式加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223

public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {


private transient volatile Node head; //链表的头指针
private transient volatile Node tail; //链表的尾指针
private volatile int state; ///同步状态

/**
* 以独占模式获取锁,忽略中断,
* 若tryAcquire()失败,则包装成一个node
* tryAcquire是protected方法,AQS中未实现,在其子类中实现.
* 若没有获得锁,调用addWaiter(),以排他模式加入队列节点.
*/
public final void acquire(int arg) {
// 调用自定义同步器重写的 tryAcquire 方法,尝试非阻塞的获取同步状态,如果获取失败(tryAcquire返回false)
// (reentranLock中通过cas进行 tryAcquire)
// 独占式且安全(CAS) 进入同步队列
// traAcquire失败,构造(独占)节点,通过addWaiter方法入队列,然后调用acquireQueued方法
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

/**
* 节点入队列
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
// cas设置tail
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}

// Queuing utilities

/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices
* to improve responsiveness with very short timeouts.
*/
static final long spinForTimeoutThreshold = 1000L;

/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
* = =,Doug Lea大神是有在循环中使用if,else的习惯吗,然后达到某条件在跳出
* 这种写法,印象中concurrentHashMap中也有这种写法
*/
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize,初始化节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
// cas方式设置tail
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
* 以独占模式不间断获取队列中的线程。
* 也可用于条件等待
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
// 自旋.
for (;;) {
// 当前节点的前驱节点
final Node p = node.predecessor();
// 只有当前节点的前驱节点为头结点时,尝试获取state
// 头节点释放同步状态,唤醒后继节点,后继节点检测自己前驱节点是否为头节点
//
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 当前节点的前驱不是头节点.
// 或者获取同步状态失败
// parkAndCheckInterrupt,将线程挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
// 实际异常才会调用此逻辑
if (failed)
cancelAcquire(node);
}
}
// Checks and updates status for a node that failed to acquire.
// Returns true if thread should block. This is the main signal
// control in all acquire loops. Requires that pred == node.prev.
//
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// 即将阻塞或者正在阻塞
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
// 如果当前节点node的前一个节点pred的waitStatus为SIGNAL,则返回true
//

return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// >0 为cancelled,所以将其之前所有>0的节点都删除
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 尝试设置成 signal
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}


/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}



}


private void cancelAcquire(Node node) {
// Ignore if node doesn't exist
if (node == null)
return;

node.thread = null;
// 跳过取消的前驱节点
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;

// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
// 有效的前驱节点的后继节点.
Node predNext = pred.next;

// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
// 设置cancelled
node.waitStatus = Node.CANCELLED;

// 在末尾,删除自己
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.

int ws;
// 前驱节点不是头节点.
if (pred != head &&
// 当前节点的有效前驱是否为signal
((ws = pred.waitStatus) == Node.SIGNAL ||
// ws<=0 ,尝试设置ws为signal
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
// 当前节点的前驱线程信息是否为空
pred.thread != null) {
// 上述条件满足
Node next = node.next;
// 后继不为空
if (next != null && next.waitStatus <= 0)
//
compareAndSetNext(pred, predNext, next);
} else {
// 当前节点的前驱节点是头节点,或者上述条件不满足.唤醒当前节点的后继节点.
unparkSuccessor(node);
}

node.next = node; // help GC
}
}

独占式释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public final boolean release(int arg) {
// 调用自 tryRelease 方法尝试释放同步状态
if (tryRelease(arg)) {
// 释放成功,获取头节点
Node h = head;
// 存在头节点,并且waitStatus不是初始状态
// 在获取的过程中会将 waitStatus的值从初始状态更新成 SIGNAL 状态
if (h != null && h.waitStatus != 0)
// 解除线程挂起状态
unparkSuccessor(h);
return true;
}
return false;
}

/**
* Wakes up node's successor, if one exists.
* 实际唤醒节点的后继节点.
*/
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
// 获取后继节点.
Node s = node.next;
// 从尾节点向前找,找到队列第一个ws<0的
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
// <0,其实是signal
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
// 解除线程挂起状态
LockSupport.unpark(s.thread);
}
nmap
【源码阅读】reentrantLock
  1. 1. 目录
  2. 2. 简介
  3. 3. 重要的方法
    1. 3.1. 可重写的方法
    2. 3.2. 同步状态的相关方法
  4. 4. 源码浅析
    1. 4.1. 简介
    2. 4.2. node
    3. 4.3. 独占式加锁
    4. 4.4. 独占式释放锁
© 2023 haoxp
Hexo theme