Java并发-AQS解析

什么是AQS

AQS全名为AbstractQueuedSynchronizer,即队列同步器。

它创始于JDK1.5,作者是很牛的Doug Lea,也是J.U.C的作者。

它是构建Lock、CountDownLatch等同步组件的基础框架。

image-20220105205641626

常见的Lock实现,比如重入锁,读写锁等,内部都聚合一个AQS,一般会以内部类的形式出现。

image-20220105202646793

我们在使用Lock提供的API时,其内部会适配到AQS的抽象方法上去。

image-20220105203116151

AQS同步的实现较为复杂,内含了同步状态的管理、线程队列、等待、通知、唤醒等一系列底层操作。

而我们只需要关注Lock的API,如何去真正实现同步的则交由AQS就可以了。

通过AQS,我们可以实现各种定制化的线程同步需求,将锁以及其他同步组件的实现难度降低了。

为何要使用AQS

java不是提供了synchronized关键字么,不是也可以实现锁么?它不够香么?主要原因有以下几点:

  • 性能方面

    早期的JDK(JDK 1.6以前),synchronized的真正实现完全依赖于系统机制(monitor模式),造成频繁的用户态到内核态的转变,性能较差,而从1.6开始,JDK对synchronized进行了优化,为我们带来了锁升级机制

    与synchronized不同的是,AQS不仅使用了系统级API,还将一部分工作交给了JAVA层面,保持了用户态,它维护了一个FIFO同步队列。

  • 功能方面

    synchronized控制力度较粗,没有办法设置超时,重试,也没有办法相应中断,而AQS可以。正因为AQS控制力度更细,所以成为了各种同步组件的基础,实现了五花八门的功能。而要是用synchronized来实现的话属实乏力。

如何使用AQS

先不管AQS底层细节,需要知道的是:AQS依赖一个volatile的state来管理同步状态

以及几个常用的API。还有其他的不一一例举。

需要我们重写的API

方法名 描述
protected boolean tryAcquire(int arg) 尝试设置state,设置成功则代表获取锁,此时AQS被置为了占用状态。
protected boolean tryRelease(int arg) 尝试设置state,设置成功则代表释放锁,此时AQS被置为空闲状态。
protected int tryAcquiredShared(int arg) 共享式获取同步状态,如果返回值大于0则表示获取成功,小于0则获取失败。
protected boolean tryReleaseShared(int arg) 共享式释放同步状态

需要我们调用的模板方法

方法名 描述
public final void acquire(int arg) 独占式获取同步状态,如果获取失败,则会将当前线程加入同步队列等待。该方法会调用tryAcquire(int arg)
public final void acquireInterruptibly(int arg) 和acquire类似,但是会响应中断。如果当前线程被中断会抛出InterruptedException返回。
public final boolean release(int arg) 独占式释放同步状态。会调用tryRelease(),成功修改同步状态后将会唤醒头节点的后继节点。
public final void acquireShared(int arg) 共享式获取同步状态,如果获取失败,则会将当前线程加入同步队列等待。该方法会调用tryAcquireShared(int arg)。同一时刻,可以有多个线程获取同步状态,state判断是大于0或小于0,而独占式只能有一个线程获取占用,tryAcquire返回的是boolean,cas要么成功要么失败。
public final void acquireSharedInterruptibly(int arg) 和acquireShared类似,可以响应中断,抛出InterruptedException并返回。
public final releaseShared(int arg) 共享式释放同步状态。

AQS实现锁,当然要告诉每个线程,他们是否获取到了锁。state就是一个标识位,当前线程调用AQS时会检查当前state是否满足预期,来判断自己是否可以继续执行,还是说进入队列等待。

一般使用AQS时,我们会把它聚合到我们自己的类当中,作为一个内部类来使用。

如下可以实现一个互斥锁。

  1. 定义互斥锁,实现Lock接口

  2. 聚合AQS

  3. 维护AQS的state

    因为可能会有竞争出现,所以需要CAS来对state进行更新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// 1. 定义互斥锁,实现Lock接口。这里互斥锁是一种独占锁。
public class Mutex implements Lock, java.io.Serializable {

// 2. 聚合AQS
private static class Sync extends AbstractQueuedSynchronizer {
// 如果同步状态state为1,那么则判定为被当前线程占用。
// 当然,可以是其他数字,按自己爱好或者需要来即可,只要上下文能对的上。
protected boolean isHeldExclusively() {
return getState() == 1;
}

// 定义尝试获取锁,具体怎么样算获取到锁?我们自己定义。但是AQS规定了,抢到了返回true,没抢到要返回false。
public boolean tryAcquire(int acquires) {
// 比如这里,将state通过cas方式更新为1则判定抢到了锁。
if (compareAndSetState(0, 1)) {
// 抢到锁可以调用父类的setExclusiveOwnerThread表明这个锁被该线程独占了。
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
// 没抢到?返回false
return false;
}

// 定义释放锁,我们约定,如果state是0,那么就算是释放状态。
protected boolean tryRelease(int releases) {
// 当然,如果本来就不是占用锁的线程,是不能被释放的,这里抛出异常。
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
// 设置state为0。直到释放完毕,另一个线程就可以通过tryAcuqire来获取到锁了。
// 这里不需要cas,也不需要原子操作。没有必要,能释放掉就行。
setState(0);
return true;
}

// 提供一个Condition
Condition newCondition() {
return new ConditionObject();
}

}

// 同步器隐藏了复杂的实现,lock只需要调用sync即可。
private final Sync sync = new Sync();

public void lock() {
sync.acquire(1);
}

public boolean tryLock() {
return sync.tryAcquire(1);
}

public void unlock() {
sync.release(1);
}

public Condition newCondition() {
return sync.newCondition();
}

public boolean isLocked() {
return sync.isHeldExclusively();
}

public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}

public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}

AQS内部实现逻辑

AQS内部主要办了四件事:

  • 维护共享同步状态state
  • 为获取到同步状态的线程加入FIFO同步队列
  • 阻塞其他未获取同步状态的线程
  • 释放时唤醒后继节点

结合起来,一般流程就是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
acquire(){
if(AQS的实现类没有获取到同步状态){
生成节点
[
加入队列
节点循环检查
]
阻塞当前线程
}
}

release(){
if(设置同步状态成功){
设置头节点状态
唤醒后继节点来竞争
}
}

同步队列

AQS维护了一个FIFO同步队列。

类似于CLH锁,该队列每个节点维护了一个线程,每个节点会自旋检查状态是否可以获取锁。头节点是当前获取到锁的线程节点。

image-20220105211238209

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
static final class Node {

static final Node SHARED = new Node();

static final Node EXCLUSIVE = null;

static final int CANCELLED = 1;

static final int SIGNAL = -1;

static final int CONDITION = -2;

static final int PROPAGATE = -3;

volatile int waitStatus;

volatile Node prev;

volatile Node next;

volatile Thread thread;

Node nextWaiter;

final boolean isShared() {
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}

Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

Node属性以及方法描述:

每个node都含有其前驱节点和后继节点,分别为prevnext

当前节点维护的线程即为thread

nextWaiter与condition队列有关,本次暂不讨论。

重点关注waitStatus的几个状态:

状态名 状态值 状态描述
CANCELLED 1 该节点由于超时或中断而被取消。该状态不会再转变为其他状态,而且该节点的线程再也不会被阻塞
INITIAL 0 初始状态
SIGNAL -1 其后继节点需要被唤醒,即该线程被释放或被取消时,必须唤醒其后继节点
CONDITION -2 表示该节点的线程在条件队列中等待,而非在同步队列中。当其他线程对Condition调用了signal()方法后,该节点会被转移到同步队列中参与资源竞争
PROPAGATE -3 只有共享模式下才会用到,为无条件传播状态。下一次共享式同步状态获取将会无条件被传播下去。共享锁会有很多线程获取到锁或者释放锁,所以有些方法是并发执行的,就会产生很多中间状态,而PROPAGATE就是为了让这些中间状态不影响程序的正常运行。

独占式

贴一张网上copy的图:

diagram-of-acquire

锁的获取

acquire(int arg)

该方法如果执行成功则表示当前线程获取到了锁。

1
2
3
4
5
6
7
8
9
public final void acquire(int arg) {
// 调用子类的tryAcquire具体实现,如果成功则当前线程获取到锁,失败则生成Node并加入队列。
if (!tryAcquire(arg) &&
// 生成Node并加入队列尾部,队列中的node引用的线程会出现阻塞和唤醒。
// 如果唤醒时tryAcquire获取同步状态成功,则退出队列。
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
// 如果能进入这里,那么acquireQueued返回的就是true。
selfInterrupt();
}
addWaiter(Node mode)

如果竞争锁失败,则会需要通过addWaiter生成node。

node有两种,Node.EXCLUSIVE 用于独占,Node.SHARED 用于共享。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 将当前争用锁的线程生成对应的node,并设置prev与next节点。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// 先获取队列当前的尾节点。
Node pred = tail;
// 如果尾节点不为空,,将并后继设置为当前线程node。
if (pred != null) {
// 则将当前线程节点node的前驱节点设置为原尾节点node
node.prev = pred;
// CAS方式将当前尾节点node的后继节点更新为当前线程节点node。
// 源码表述为:Try the fast path of enq; backup to full enq on failure
// 简单尝试设置一次,成功直接返回,失败则进入enq进行较复杂逻辑。这不是原子操作!
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 如果原本一个节点都没有,或者是第一次尝试设置尾节点时CAS失败了,则会走enq方法。
enq(node);
return node;
}
enq(Node node)

生成CLH节点,并设置前驱与后继,形成CLH。这里生成尾节点时需要循环CAS的方式保证原子性。

image-20220105222649483

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 一个典型的无限循环CAS,也就是实现一个原子操作。是的尾节点的设置具有原子性,不会出错。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
// 如果当前原有的尾节点为空,也就是说一个节点都没有时,就一定要初始化一个节点作为头节点。
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
// 头节点生成成功,继续for循环,这次将会走else部分。
} else {
// 如果当前存在尾节点,那么就如同之前addWaiter中的操作,设置相关前驱和后继。
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
// 设置尾节点成功才会退出无限循环。
return t;
}
}
}
}
acquireQueued(final Node node, int arg)

生成node后,通过acquireQueued来维护队列,每个节点通过自旋方式检查前驱节点状态,以及维护前驱与自身的状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 队列操作,无限循环校验当前node是否有资格竞争锁以及是否需要阻塞。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
// 获取当前node的前驱节点
final Node p = node.predecessor();
// 如果前驱节点是头节点了,那么他就有资格去tryAcquire竞争锁,否则老老实实继续排队。
if (p == head && tryAcquire(arg)) {
// tryAcquire竞争成功,那么就删除原来的头节点(p.next = null去除头节点多余引用来帮助GC),
// 并将该node设置为头节点。setHead实现很简单,可以看下文。
setHead(node);
p.next = null; // help GC
failed = false;
// 这里返回false,线程不会被acquire方法中的selfInterrupt()中断
return interrupted;
}
// 如果没有抢到锁,则要判断这个节点的线程是否需要阻塞以及是否已经中断。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
// 如果执行到这,那么node在排队的时候被检测到中断了。
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

只有前驱节点是头节点的线程才能去获取AQS的state同步状态。原因有两点:

  1. 头节点为正在执行的线程节点。如果头节点执行完毕,那么将会释放同步状态(见后面的release方法详解),并唤醒后继节点。

    后继节点被唤醒后,继续执行acquireQueued方法的自旋逻辑,判断前驱节点是否是头节点,是的话去tryAcquire。

  2. 实现FIFO,最先加入队列的节点第一个被通知来竞争锁。竞争成功后node会被后面的shouldParkAfterFailedAcquire方法设置waitStatus为SIGNAL。

setHead(Node node)
1
2
3
4
5
6

private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
shouldParkAfterFailedAcquire(Node pred, Node node)

当CLH的非头节点自旋检测前驱节点状态时,需要判断自身是否需要阻塞,毕竟老是自旋会非常消耗CPU。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 如果前驱节点的waitStatus是SIGNAL,值为-1,则表示前驱节点线程正在执行,后续节点都需要阻塞。
// 于是直接返回true表示当前线程需要阻塞。
return true;
if (ws > 0) {
// 如果前驱节点的waitStatus大于0,比如CANCELLED为1,则表示前驱线程已经取消了,
// 就需要把当前节点node的前驱节点前移,跳过已取消的节点,并再次判断状态。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 回想一下Node的waitStatus有哪几种状态?
// CANCELLED:1,INITIAL:0,SIGNAL:-1,CONDITION:-2,PROPAGATE:-3
// 已经排除了-1 和 1,且不会是condition状态下,所以只会是0和-3,也就是INITIAL和PROPAGATE
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
// 不论获取成功和失败,继续循环CAS,当然,如果成功了,下次循环就不会执行这块else,而是第一个if,
// 返回true。当前线程将会被阻塞。
}
return false;
}

以上就是独占式获取锁的流程,接下来讲释放锁。

锁的释放

release(int arg)

步骤不多,主要是两步

  1. 调用子类实现的tryRelease方法,判断是否需要去释放掉队列头节点
  2. 如果头节点不为空且头节点不是初始化状态,那么会唤醒后继节点。

image-20220105222956649

1
2
3
4
5
6
7
8
9
10
11
12
public final boolean release(int arg) {
// 交由子类实现,子类可以判断AQS中的同步状态state的值是否满足预期,设置新的state并让tryRelease返回true或false。
// true则为释放成功,false则为释放失败。
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 唤醒后继节点
unparkSuccessor(h);
return true;
}
return false;
}

具体看下unparkSuccessor的实现。

unparkSuccessor(Node node)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private void unparkSuccessor(Node node) {
// 如果头节点waitStatus为取消状态,那么就需要对其置为初始化。
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
// 先直接查看后继节点,如果后继节点被取消或者是空,那么从队列尾部向前检测,是否有node的状态为初始化或者是SIGNAL。
// 如果有符合条件的后继,那么唤醒它,继续执行acquireQueued中的自旋来竞争锁。
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

共享式

共享式与独占式的不同在于,独占式只允许一个线程获取到资源,其他线程直接阻塞,如ReentrantLock,在调用lock()方法后,其他线程无法执行lock后续的逻辑。而共享式则可以有多个线程同时访问共享资源。

以常用的CountDownLatch为例:

只有调用await()时主线程才会阻塞,实际上调用await()也就是再调用AQS的acquireSharedInterruptibly()。

而其他所有线程都可以同时对一个共享变量进行操作。等待所有其他线程处理完毕后,再执行await()的后续操作。

image-20220105224258395

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class CountDownLatch {
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

// 如果当前state值不为零,则可以获取这个共享锁,返回复数-1,否则coutDown结束了,返回正数1
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// 每次对state减去1,直到减到0,返回true,锁被释放。
// 这里需要循环CAS实现原子操作。
for (; ; ) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

private final java.util.concurrent.CountDownLatch.Sync sync;
// 初始化一个count作为state值,当这个值被countDown消耗完,锁就会被释放。
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new java.util.concurrent.CountDownLatch.Sync(count);
}

// 阻塞线程
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 每次release掉一个
public void countDown() {
sync.releaseShared(1);
}
}

主要看下await()方法,也其实就是acquireSharedInterruptibly的实现。

锁的获取

acquireSharedInterruptibly(int arg)
1
2
3
4
5
6
7
8
9
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 重点在这里,会调用子类的tryAcquireShared,如果小于0,则会阻塞。
if (tryAcquireShared(arg) < 0)
// 实际的阻塞操作,会生成共享式节点加入队列中。
doAcquireSharedInterruptibly(arg);
}
doAcquireSharedInterruptibly(int arg)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 和独占式相同的是,都是调用addWaiter方法,生成节点加入队列。
// 和独占式不同的是这里是共享节点。
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 老样子,循环CAS来维护节点
// 和独占式相同的是,一样是必须前驱节点是头节点才可以tryAcquireShared
// 和独占式不同的是,如果成功获取到了(tryAcquireShared返回值大于等于0),调用的是setHeadAndPropagate。
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 这里获取锁失败了,继续阻塞,和独占式相同
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

锁的释放

CountDownLatch中调用的是countDown(),代表一个线程执行完毕了,需要扣去AQS的state值。

实际上也是调用和独占式release(int arg)类似的一个方法:releaseShared(int arg)。

releaseShared(int arg)
1
2
3
4
5
6
7
8
9
public final boolean releaseShared(int arg) {
// 子类中实现,CountDownLatch中每次都会从state中扣除arg的值。
if (tryReleaseShared(arg)) {
// 释放共享状态成功,则执行释放同步
doReleaseShared();
return true;
}
return false;
}
doReleaseShared()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

从网上copy的一张图。有空再自己画一遍加深印象。

img

思考

  1. AQS如何保证的内存可见性?
  2. acquire()流程为何多次调用了tryAcquire()?
  3. 为何监测waitStatus总是从队列尾部向头部检测?