Java AbstractQueuedSynchronizer源代码分析
AbstractQueuedSynchronizer
(以下简称为aqs)是Java中各种锁(Lock
)、信号量(Semaphore
)、互斥量(Mutex
)、栅栏(Barrier
)等实现的基础框架。该框架是基于变种的CLH锁
--一个FIFO的队列作为基础数据结构的。CLH锁具有较好的性能和扩展性。
aqs将资源的状态抽象为一个int变量,通过底层CAS
原子操作来改变其状态,具体暴露给子类的的函数有:getState
, setState
, compareAndSetState
三个。同时,子类只需要通过这几个函数来实现自己的逻辑,其他的像队列操作、线程阻塞等,aqs已经实现好了。具体需要实现的函数有tryAcquire
, tryRelease
, tryAcquireShared
, tryReleaseShared
, isHeldExclusively
。
aqs将资源的类型分为:独占(exclusive)和共享(shared)两种模式。因此,代码中所有的操作也有两种变体。队列的每个node也分为两种类型。子类需要实现的函数也分成了两类。
另外,aqs也支持对一个资源设置条件
锁:当条件满足时,才进入正常的锁竞争阶段。
底层数据结构-CLH锁的变体
如上图所示,aqs底层数据结构是一个双向链表,分别有prev
和next
两个指针指向前后的节点,并且每一个node
都有:一个status
字段用来标记该节点的状态,一个thread
变量,表示代表的线程,还有一个nextWaiter
,表示条件队列中下一个节点(这个后面会讲)。
其中status可以是下面的几种:
CANCELLED
:值为1,代表该thread等待超时或者被interrupt了;SIGNAL
:值为-1,表示该thread需要等待被唤醒('signal me at proper time'),在独占模式下,等待节点一般是这种情况;CONDITION
:值为-2,表示该thread是个条件等待节点,需要条件达到,才能进入正常的竞争队列;PROPAGATE
:值为-3,共享模式下的一种情况,后面会介绍;- 其他:初始化为0,刚新建的节点。
1.1 独占模式的获取资源
public final void acquire(int arg) { // 独占模式获取资源,忽略中断,调用tryAcquire至少一次
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这是独占模式的获取锁的主要函数。该函数忽略线程的interrupt信号。传入的参数可以忽略,只有在很少情况下才会用到,这里只是直接传给子函数的。tryAcquire
在上面,我们已经说过,是子类需要实现的核心逻辑,根据场景不同而异。如果该函数返回true,则表示当前线程获取资源成功了,那就不需要下面的步骤了,直接返回。如果没有成功,说明需要入队等待,也就是addWaiter
函数所做的事情,我们来看一下:
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
if (pred != null) { // 如果之前有节点在队列中,快速通道
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node); // 常规模式
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 必须初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
先new一个独占类型的node,然后插入队列中:先测试快速模式,然后用一般的enq
进行插入。
enq函数采用类似自旋
的方式实现:先检查是否是个空队列(null==tail),然后按照一般模式插入,当然都要采用原子的方式(CAS)进行。从这里的代码,可以发现,head节点其实是个dummy节点。
虽然插入队列了,但是我们并没有设置该节点的waitStatus,为什么没有在初始化的时候一起设置了呢?
这是由于:队列中可能已经存在很多等待节点,每个节点的等待类型不同:有可能是已经cancel的、有条件等待刚被transfer进来的(后面会讲)、还有刚刚初始化(我们这种情况)插入的,还有已经设置为signal
(等待被唤醒)的。我们节点是应该要插入signal节点的后面,如果没有这种节点,则将head节点设置成这个节点,并设置成signal。
下面看一下具体代码:
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor(); // 每次都要重新获取新的前置节点
if (p == head && tryAcquire(arg)) { // 每次都需要重新检查是否可以获取资源了
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted; // 只有在这里才能退出该函数
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 这里block该线程的执行,并检查是否被interrupt过
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node); // 异常情况会到这
}
}
也是采用类似自旋的方式实现。每次重新获取前置节点和检查是否能获取资源的原因是:在多个线程竞争的情况下,由于编译器优化导致的指令重排或者其他线程可能释放资源,前一轮循环的环境状态已经发生变化了,所以在park
线程前需要重新检查,尽量避免不必要的park和unpark。
该函数结束有两种情况:1. 当前线程得到了竞争的资源(p == head && tryAcquire(arg));2. 执行过程中发生了异常,这个异常会传递到上层的代码中。
下面来看一下shouldParkAfterFailedAcquire
函数,可以形象的叫做找车位
函数:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL) // 之前已经有节点在排队等了,那我们也只能等了
return true;
if (ws > 0) {
do { // 前置节点是个cancel节点,那就需要寻找非cancel的第一个节点
node.prev = pred = pred.prev; //
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// waitStatus肯定是0或者propagate,先暂时不要返回true来block该线程,
// 下一次循环会再次试图获取(tryAcquire),外循环“自旋”
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);// 只是标记一下pred为signal,无所谓成功与否,因为外面有个“自旋”
}
return false;
}
该函数其实是给node几点寻找真正要'park'的位置。总体逻辑就是寻找可以'park'的节点,然后把当前节点放在它后面。所谓可以park,就是该节点是个“等待被唤醒”的节点。如果没有这样的前置节点,即节点的status可能是0或者'PROPAGATE',这种情况,设置为signal节点,等待下一轮操作(在一个自旋代码块中)。
如果确实需要'park',则进入parkAndCheckInterrupt
函数:
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
这里,利用LockSupport
提供的park
和unpark
来suspend和resume一个线程,代替Thread.suspend
和Thread.resume
,因为后者会产生死锁,已经被JDK1.7中标记为deprecated了。
流程图
- 1. 调用tryAcquire试图获取资源,如果成功,则返回;否则进入第2步;
- 2. 可能需要排队。先new一个exclude模式的node,并addWaiter插入队列中,进入自旋代码块;
- 3.1 检查pred是否是head,并再试一次tryAcquire。如果都成功,setHead返回;否则下一步;
- 3.2 查看是否需要立即park,如果是,则block当前线程并检查中断标记;否则自旋,进入3.1;
- 3.3 当其他线程唤醒该线程时(unpaarkSuccessor),从blocked状态恢复执行,进入3.1;
- 4. 在第三步中如果有异常,则cancel,设置CANCELLED。
几点说明
- 1. head节点中的线程(如果不为null)持有竞争的资源;
- 2. 在park这步中,每次更新了节点的信息,为了减少不必要的操作(park和unpark)都会尝试再次获取资源,如果成功就不会block;
- 3. 新加入一个节点时,并没有设置其waitStatus(为0),只有在其后面再插入其他节点时,才会设置为signal;
- 4. 在整个过程中,需要记录interrupt状态,在获取资源成功后,需要重新interrupt该线程;
1.2 独占模式的释放资源
在上面的获取资源的代码中,我们可以知道:如果一个资源竞争得到资源了,它会被设置到头节点,因此,我们释放时,直接从head开始检查。代码如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先,调用子类的tryRelease
做相关的释放资源的工作,然后从head开始(当前的线程所在的node),寻找需要被唤醒的下一个节点。如果head的waitStatus为0,则是最后一个节点了,不需要寻找了,直接退出。如果不为0,则可能存在需要唤醒的其他节点。
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) // 如果status小于0,设置为0,让其不要再参与了
compareAndSetWaitStatus(node, ws, 0); // 独占模式无所谓失败(共享模式会在调用该函数之前确保设置成功)
Node s = node.next; // 一般下一个就是需要的唤醒的节点,这是next字段存在的最大意义
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev) // 寻找第一个非cancel的节点
if (t.waitStatus <= 0)
s = t; // 注意,这里没有break,之前老以为是最后一个😅
}
if (s != null)
LockSupport.unpark(s.thread);
}
基本逻辑,是寻找下一个要唤醒的节点;大部分情况,直接使用next就可以拿到。少部分情况,需要从队尾向前找。
unpark之后,当前线程恢复执行,从之前的block点:在acquireQueued
的自旋循环里,开始。然后:如果是通过next找到的,则第一个if(p == head && tryAcquire(arg))中p == head
是成立的,后面再次试图获取资源;如果是后面的某个节点,则通过后面的shouldParkAfterFailedAcquire
继续。
1.3 独占类型acquire的其他变体
1.
acquireInterruptibly
函数从名字就可以看出,这个是可以interrupt的acquire函数。和acquire不同的地方在于,如果在整个获取资源的过程中:acquire函数会记录线程是否被interrupt过,然后在获取成功后,重新触发一次interrupt;而该函数则在有interrupt的时候,直接throw
InterruptedException
出来。所以,执行acquire函数的线程在获取资源的过程中,会忽略interrupt信号,直到获取成功后,再检查一下是否有过interrupt,如果有,则selfInterrupt
。而该函数则会直接抛出。代码片段:
// 主函数中不同的地方 if (Thread.interrupted()) throw new InterruptedException(); // doAcquireInterruptibly中和acquireQueued不同的地方 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException();
2.
tryAcquireNanos
函数该函数是acquire的超时版本,同时和acquireInterruptibly一样,会在收到interrupt信号时,直接throw异常出来。
代码片段:
// 和acquireQueued主要不同的地方,nanosTimeout时每次自旋的时候,计算的剩余时间 // spinForTimeoutThreshold则是为了优化设置的一个阈值,小于阈值时,直接自旋下次 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout);
1.4 cancelAcquire函数
private void cancelAcquire(Node node) {
if (node == null) // Ignore if node doesn't exist
return;
node.thread = null;
// Skip cancelled predecessors
Node pred = node.prev;
while (pred.waitStatus > 0) //寻找合法的前缀节点
node.prev = pred = pred.prev;
Node predNext = pred.next;
node.waitStatus = Node.CANCELLED; // 标记为cancelled
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) { // 如果是tail直接删除
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
pred.thread != null) {
Node next = node.next;
if (next != null && next.waitStatus <= 0) // 将next节点链接到pred的next上,链表的删除操作
compareAndSetNext(pred, predNext, next);
} else { // 前缀是head节点,则要唤醒后面的节点
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
这个函数会在acquire异常的时候触发,异常则来自于前面的两个函数acquireInterruptibly和tryAcquireNanos。实现逻辑也很简单:先找到合法前缀节点,标记节点为cancelled,然后:如果node是队尾,直接删除;如果是队列中第二个节点(pred是head),则唤醒后面的节点;其他情况,则把node的next节点链接到pred后面,其实就是删除该节点。
1.5 release操作
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
release操作非常简单:先调用子类的tryRelease做资源状态等的修改,然后唤醒下一个等待signal的线程。因为,持有资源的线程是head的next节点,所以从head开始寻找下一个有效节点即可。
1.6 独占类型总结
从上面的分析,可以看出next指针,用来快速的通知下一个要唤醒的节点,很大程度上是一种优化手段,并不依赖。至于,每个节点的waitStatus,我们这里只涉及到了为signal和0的情况,其他几种情况还没有碰到。分析源代码的时候,尽量两个函数作为一对,一起来分析,才能够更容易的理解。
2.1 共享类型的acquire操作
和独占模式一样:先调用子类的tryAcquireShared
函数,如果失败,则入队等待。该函数忽略interrupt。
tryAcquireShared函数的返回值:
>0
: 成功,还有剩余资源,后续线程还可以继续请求;
=0
: 成功,没有剩余资源,后续资源不能请求;
<0
: 失败;
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r); // 和独占模式不同的地方,可以多个节点设置head
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看出和独占模式的逻辑基本类似,只有在成功获取资源时的处理不同,即函数setHeadAndPropagate
:
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 || // waitStatus < 0,表示上次有节点释放过资源,
(h = head) == null || h.waitStatus < 0) { // 但是没有找到等待的节点(通过propagate标记过)
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
其中,setHead和独占模式是一样的,标记当前正在占有资源的线程节点。不一样的地方在于这个propagate
。
propagate的意思是:当唤醒一个propagate节点时,需要继续唤醒其后续节点,将这种唤醒状态“传播”下去。propagate只会设置在head节点上(参见下面的doReleaseShared代码)。
2.2 共享模式的release操作
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 唤醒在等待signal的后续节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 标记为propagate,并跳过本轮,等待下一轮
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
类似独占模式,releaseShared
先调用子类的tryRelease,如果成功,则调用上面的函数,来操作队列。
- 1. 当head的waitStatus是
SIGNAL
时,和独占模式一样,说明后续节点正在等待唤醒,遂唤醒之; - 2. 当head的waitStatus是
0
时,这种情况发生在:第一次试图tryAcquire时失败后,初始化一个node,并addWaiter到队尾;进入自旋:第二次(或者第三次)检查tryAcquire成功,这时候setHead,就会出现这种情况。因此,这种情况下,该节点已经获取资源了,不需要等待signal,在release时应该标记一下并跳过本轮循环。
2.3 为什么需要PROPAGATE这个waitStatus
这是因为:唤醒等待节点,只会发生在head节点的waitStatus为signal时,这是独占和共享模式相同的原则;
但是,独占模式和共享模式不同的地方在于:共享模式,可以多个节点同时设置head,表示自己占有资源。这样便会发生head的waitStatus为0的情况,然后在release时,对于这种情况的处理方式就是做一个标记(propagate),表示有其他节点曾经释放过资源,如果下一次获取资源时,碰到这种waitsStatus的head时,就可以直接进行release,唤醒下一个等待signal的节点,而不是等到下一次显式的调用releaseShared。
3.1 对条件的支持
aqs提供了一个ConditionObject
的类,来实现Lock
接口,提供类似Object的wait和notify的操作,进而实现在一个锁上的条件等待和唤醒。
在其内部,aqs同样使用了一个队列,叫做条件队列
。里面的每个节点的waitStatus为condition。我们不妨将前面的队列称为同步队列
,以做区分。所有在同一个条件上等待的线程都同步在这个条件队列上,当条件达成时,这个条件队列的等待最久的那个(第一个)会被移到同步队列中,然后就和之前的独占模式一样了。
3.2 ConditionObject的await操作
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // new 一个condition status node,并加入“条件队列”
int savedState = fullyRelease(node); // 释放资源,并保存之前资源状态
int interruptMode = 0;
while (!isOnSyncQueue(node)) { // 自旋:block当前线程,直到被唤醒或者有中断
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) // 再次竞争资源
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0) // 如果之前有中断,再次throw
reportInterruptAfterWait(interruptMode);
}
该函数实现了对某个条件的等待,同时在等待时如果有中断发生时,会在结束时throw出来。具体步骤如下:
- 1. 检查当前线程是否有中断,如果有直接throw出来;
- 2. 保存之前资源的同步状态,同时释放锁(fullyRelease);
- 3. 进入自旋,并block当前线程。退出条件是:当前node被移到同步队列或者检测到有中断;
- 4. 试图重新获取锁,同时如果之前有中断发生过,重新throw;
说明:这里的“中断”,是取消等待某个条件达成,不是取消竞争资源。所以,在checkInterruptWhileWaiting中会将该node的status设置为0,并加入同步队列,参与竞争。
3.3 ConditionObject的signal操作
private void doSignal(Node first) { // 传入的参数是第一个等待条件的节点
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false; // 如果设置失败,证明该节点已经失效,被cancel了,直接返回false
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 前置节点cancel或者设置失败,重新唤醒该线程
LockSupport.unpark(node.thread); // 让其进入自旋,参与竞争
return true;
}
signal操作主要是上面两个函数实现:从条件队列的第一个节点,向后找到第一个能被成功移到同步队列的节点,同时根据情况唤醒该节点。
4. 总结
本文从源代码实现的角度,分别分析了aqs框架的独占模式、共享模式以及条件是如何实现的。着重对其“获取”和“释放”进行了原理分析。对后面分析各种同步工具做了基础性的铺垫。