什么是AQS AQS全名为AbstractQueuedSynchronizer,即队列同步器。
它创始于JDK1.5,作者是很牛的Doug Lea,也是J.U.C的作者。
它是构建Lock、CountDownLatch等同步组件的基础框架。
常见的Lock实现,比如重入锁,读写锁等,内部都聚合一个AQS,一般会以内部类的形式出现。
我们在使用Lock提供的API时,其内部会适配到AQS的抽象方法上去。
AQS同步的实现较为复杂,内含了同步状态的管理、线程队列、等待、通知、唤醒等一系列底层操作。
而我们只需要关注Lock的API,如何去真正实现同步的则交由AQS就可以了。
通过AQS,我们可以实现各种定制化的线程同步需求,将锁以及其他同步组件的实现难度降低了。
为何要使用AQS java不是提供了synchronized关键字么,不是也可以实现锁么?它不够香么?主要原因有以下几点:
性能方面
早期的JDK(JDK 1.6以前),synchronized的真正实现完全依赖于系统机制(monitor模式),造成频繁的用户态到内核态的转变,性能较差,而从1.6开始,JDK对synchronized进行了优化,为我们带来了锁升级机制 。
与synchronized不同的是,AQS不仅使用了系统级API,还将一部分工作交给了JAVA层面,保持了用户态,它维护了一个FIFO同步队列。
功能方面
synchronized控制力度较粗,没有办法设置超时,重试,也没有办法相应中断,而AQS可以。正因为AQS控制力度更细,所以成为了各种同步组件的基础,实现了五花八门的功能。而要是用synchronized来实现的话属实乏力。
如何使用AQS 先不管AQS底层细节,需要知道的是:AQS依赖一个volatile的state来管理同步状态
以及几个常用的API。还有其他的不一一例举。
需要我们重写的API
方法名
描述
protected boolean tryAcquire(int arg)
尝试设置state,设置成功则代表获取锁,此时AQS被置为了占用状态。
protected boolean tryRelease(int arg)
尝试设置state,设置成功则代表释放锁,此时AQS被置为空闲状态。
protected int tryAcquiredShared(int arg)
共享式获取同步状态,如果返回值大于0则表示获取成功,小于0则获取失败。
protected boolean tryReleaseShared(int arg)
共享式释放同步状态
需要我们调用的模板方法
方法名
描述
public final void acquire(int arg)
独占式获取同步状态,如果获取失败,则会将当前线程加入同步队列等待。该方法会调用tryAcquire(int arg)
public final void acquireInterruptibly(int arg)
和acquire类似,但是会响应中断。如果当前线程被中断会抛出InterruptedException返回。
public final boolean release(int arg)
独占式释放同步状态。会调用tryRelease(),成功修改同步状态后将会唤醒头节点的后继节点。
public final void acquireShared(int arg)
共享式获取同步状态,如果获取失败,则会将当前线程加入同步队列等待。该方法会调用tryAcquireShared(int arg)。同一时刻,可以有多个线程获取同步状态,state判断是大于0或小于0,而独占式只能有一个线程获取占用,tryAcquire返回的是boolean,cas要么成功要么失败。
public final void acquireSharedInterruptibly(int arg)
和acquireShared类似,可以响应中断,抛出InterruptedException并返回。
public final releaseShared(int arg)
共享式释放同步状态。
AQS实现锁,当然要告诉每个线程,他们是否获取到了锁。state就是一个标识位,当前线程调用AQS时会检查当前state是否满足预期,来判断自己是否可以继续执行,还是说进入队列等待。
一般使用AQS时,我们会把它聚合到我们自己的类当中,作为一个内部类来使用。
如下可以实现一个互斥锁。
定义互斥锁,实现Lock接口
聚合AQS
维护AQS的state
因为可能会有竞争出现,所以需要CAS来对state进行更新
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 public class Mutex implements Lock , java.io.Serializable { private static class Sync extends AbstractQueuedSynchronizer { protected boolean isHeldExclusively () { return getState() == 1 ; } public boolean tryAcquire (int acquires) { if (compareAndSetState(0 , 1 )) { setExclusiveOwnerThread(Thread.currentThread()); return true ; } return false ; } protected boolean tryRelease (int releases) { if (getState() == 0 ) throw new IllegalMonitorStateException (); setExclusiveOwnerThread(null ); setState(0 ); return true ; } Condition newCondition () { return new ConditionObject (); } } private final Sync sync = new Sync (); public void lock () { sync.acquire(1 ); } public boolean tryLock () { return sync.tryAcquire(1 ); } public void unlock () { sync.release(1 ); } public Condition newCondition () { return sync.newCondition(); } public boolean isLocked () { return sync.isHeldExclusively(); } public boolean hasQueuedThreads () { return sync.hasQueuedThreads(); } public void lockInterruptibly () throws InterruptedException { sync.acquireInterruptibly(1 ); } public boolean tryLock (long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireNanos(1 , unit.toNanos(timeout)); } }
AQS内部实现逻辑 AQS内部主要办了四件事:
维护共享同步状态state
为获取到同步状态的线程加入FIFO同步队列
阻塞其他未获取同步状态的线程
释放时唤醒后继节点
结合起来,一般流程就是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 acquire(){ if(AQS的实现类没有获取到同步状态){ 生成节点 [ 加入队列 节点循环检查 ] 阻塞当前线程 } } release(){ if(设置同步状态成功){ 设置头节点状态 唤醒后继节点来竞争 } }
同步队列 AQS维护了一个FIFO同步队列。
类似于CLH锁,该队列每个节点维护了一个线程,每个节点会自旋检查状态是否可以获取锁。头节点是当前获取到锁的线程节点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 static final class Node { static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared () { return nextWaiter = = SHARED; } final Node predecessor () throws NullPointerException { Node p = prev; if (p == null ) throw new NullPointerException (); else return p; } Node() { } Node(Thread thread, Node mode) { this .nextWaiter = mode; this .thread = thread; } Node(Thread thread, int waitStatus) { this .waitStatus = waitStatus; this .thread = thread; } }
Node属性以及方法描述:
每个node都含有其前驱节点和后继节点,分别为prev 与next 。
当前节点维护的线程即为thread 。
nextWaiter 与condition队列有关,本次暂不讨论。
重点关注waitStatus 的几个状态:
状态名
状态值
状态描述
CANCELLED
1
该节点由于超时或中断而被取消。该状态不会再转变为其他状态,而且该节点的线程再也不会被阻塞
INITIAL
0
初始状态
SIGNAL
-1
其后继节点需要被唤醒,即该线程被释放或被取消时,必须唤醒其后继节点
CONDITION
-2
表示该节点的线程在条件队列中等待,而非在同步队列中。当其他线程对Condition调用了signal()方法后,该节点会被转移到同步队列中参与资源竞争
PROPAGATE
-3
只有共享模式下才会用到,为无条件传播状态。下一次共享式同步状态获取将会无条件被传播下去。共享锁会有很多线程获取到锁或者释放锁,所以有些方法是并发执行的,就会产生很多中间状态,而PROPAGATE就是为了让这些中间状态不影响程序的正常运行。
独占式 贴一张网上copy的图:
锁的获取 acquire(int arg) 该方法如果执行成功则表示当前线程获取到了锁。
1 2 3 4 5 6 7 8 9 public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
addWaiter(Node mode) 如果竞争锁失败,则会需要通过addWaiter生成node。
node有两种,Node.EXCLUSIVE 用于独占,Node.SHARED 用于共享。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
enq(Node node) 生成CLH节点,并设置前驱与后继,形成CLH。这里生成尾节点时需要循环CAS的方式保证原子性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
acquireQueued(final Node node, int arg) 生成node后,通过acquireQueued来维护队列,每个节点通过自旋方式检查前驱节点状态,以及维护前驱与自身的状态。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } }
只有前驱节点是头节点的线程才能去获取AQS的state同步状态。原因有两点:
头节点为正在执行的线程节点。如果头节点执行完毕,那么将会释放同步状态(见后面的release方法详解),并唤醒后继节点。
后继节点被唤醒后,继续执行acquireQueued方法的自旋逻辑,判断前驱节点是否是头节点,是的话去tryAcquire。
实现FIFO,最先加入队列的节点第一个被通知来竞争锁。竞争成功后node会被后面的shouldParkAfterFailedAcquire方法设置waitStatus为SIGNAL。
setHead(Node node) 1 2 3 4 5 6 private void setHead (Node node) { head = node; node.thread = null ; node.prev = null ; }
shouldParkAfterFailedAcquire(Node pred, Node node) 当CLH的非头节点自旋检测前驱节点状态时,需要判断自身是否需要阻塞,毕竟老是自旋会非常消耗CPU。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; }
以上就是独占式获取锁的流程,接下来讲释放锁。
锁的释放 release(int arg) 步骤不多,主要是两步
调用子类实现的tryRelease方法,判断是否需要去释放掉队列头节点
如果头节点不为空且头节点不是初始化状态,那么会唤醒后继节点。
1 2 3 4 5 6 7 8 9 10 11 12 public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
具体看下unparkSuccessor的实现。
unparkSuccessor(Node node) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
共享式 共享式与独占式的不同在于,独占式只允许一个线程获取到资源,其他线程直接阻塞,如ReentrantLock,在调用lock()方法后,其他线程无法执行lock后续的逻辑。而共享式则可以有多个线程同时访问共享资源。
以常用的CountDownLatch为例:
只有调用await()时主线程才会阻塞,实际上调用await()也就是再调用AQS的acquireSharedInterruptibly()。
而其他所有线程都可以同时对一个共享变量进行操作。等待所有其他线程处理完毕后,再执行await()的后续操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 public class CountDownLatch { private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L ; Sync(int count) { setState(count); } int getCount () { return getState(); } protected int tryAcquireShared (int acquires) { return (getState() == 0 ) ? 1 : -1 ; } protected boolean tryReleaseShared (int releases) { for (; ; ) { int c = getState(); if (c == 0 ) return false ; int nextc = c - 1 ; if (compareAndSetState(c, nextc)) return nextc = = 0 ; } } } private final java.util.concurrent.CountDownLatch.Sync sync; public CountDownLatch (int count) { if (count < 0 ) throw new IllegalArgumentException ("count < 0" ); this .sync = new java .util.concurrent.CountDownLatch.Sync(count); } public void await () throws InterruptedException { sync.acquireSharedInterruptibly(1 ); } public void countDown () { sync.releaseShared(1 ); } }
主要看下await()方法,也其实就是acquireSharedInterruptibly的实现。
锁的获取 acquireSharedInterruptibly(int arg) 1 2 3 4 5 6 7 8 9 public final void acquireSharedInterruptibly (int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException (); if (tryAcquireShared(arg) < 0 ) doAcquireSharedInterruptibly(arg); }
doAcquireSharedInterruptibly(int arg) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private void doAcquireSharedInterruptibly (int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true ; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0 ) { setHeadAndPropagate(node, r); p.next = null ; failed = false ; return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException (); } } finally { if (failed) cancelAcquire(node); } }
锁的释放 CountDownLatch中调用的是countDown(),代表一个线程执行完毕了,需要扣去AQS的state值。
实际上也是调用和独占式release(int arg)类似的一个方法:releaseShared(int arg)。
releaseShared(int arg) 1 2 3 4 5 6 7 8 9 public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
doReleaseShared() 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
从网上copy的一张图。有空再自己画一遍加深印象。
思考
AQS如何保证的内存可见性?
acquire()流程为何多次调用了tryAcquire()?
为何监测waitStatus总是从队列尾部向头部检测?