Java AbstractQueuedSynchronizer源代码分析

AbstractQueuedSynchronizer(以下简称为aqs)是Java中各种锁(Lock)、信号量(Semaphore)、互斥量(Mutex)、栅栏(Barrier)等实现的基础框架。该框架是基于变种的CLH锁--一个FIFO的队列作为基础数据结构的。CLH锁具有较好的性能和扩展性。

aqs将资源的状态抽象为一个int变量,通过底层CAS原子操作来改变其状态,具体暴露给子类的的函数有:getState, setState, compareAndSetState三个。同时,子类只需要通过这几个函数来实现自己的逻辑,其他的像队列操作、线程阻塞等,aqs已经实现好了。具体需要实现的函数有tryAcquire, tryRelease, tryAcquireShared, tryReleaseShared, isHeldExclusively

aqs将资源的类型分为:独占(exclusive)和共享(shared)两种模式。因此,代码中所有的操作也有两种变体。队列的每个node也分为两种类型。子类需要实现的函数也分成了两类。

另外,aqs也支持对一个资源设置条件锁:当条件满足时,才进入正常的锁竞争阶段。

底层数据结构-CLH锁的变体

aqs数据结构图

如上图所示,aqs底层数据结构是一个双向链表,分别有prevnext两个指针指向前后的节点,并且每一个node都有:一个status字段用来标记该节点的状态,一个thread变量,表示代表的线程,还有一个nextWaiter,表示条件队列中下一个节点(这个后面会讲)。

其中status可以是下面的几种:

1.1 独占模式的获取资源

public final void acquire(int arg) { // 独占模式获取资源,忽略中断,调用tryAcquire至少一次
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这是独占模式的获取锁的主要函数。该函数忽略线程的interrupt信号。传入的参数可以忽略,只有在很少情况下才会用到,这里只是直接传给子函数的。tryAcquire在上面,我们已经说过,是子类需要实现的核心逻辑,根据场景不同而异。如果该函数返回true,则表示当前线程获取资源成功了,那就不需要下面的步骤了,直接返回。如果没有成功,说明需要入队等待,也就是addWaiter函数所做的事情,我们来看一下:

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    if (pred != null) { // 如果之前有节点在队列中,快速通道
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node); // 常规模式
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // 必须初始化
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

先new一个独占类型的node,然后插入队列中:先测试快速模式,然后用一般的enq进行插入。

enq函数采用类似自旋的方式实现:先检查是否是个空队列(null==tail),然后按照一般模式插入,当然都要采用原子的方式(CAS)进行。从这里的代码,可以发现,head节点其实是个dummy节点

虽然插入队列了,但是我们并没有设置该节点的waitStatus,为什么没有在初始化的时候一起设置了呢?

这是由于:队列中可能已经存在很多等待节点,每个节点的等待类型不同:有可能是已经cancel的、有条件等待刚被transfer进来的(后面会讲)、还有刚刚初始化(我们这种情况)插入的,还有已经设置为signal(等待被唤醒)的。我们节点是应该要插入signal节点的后面,如果没有这种节点,则将head节点设置成这个节点,并设置成signal。

下面看一下具体代码:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();  // 每次都要重新获取新的前置节点
            if (p == head && tryAcquire(arg)) { // 每次都需要重新检查是否可以获取资源了
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;   // 只有在这里才能退出该函数
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())  // 这里block该线程的执行,并检查是否被interrupt过
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);  // 异常情况会到这
    }
}

也是采用类似自旋的方式实现。每次重新获取前置节点检查是否能获取资源的原因是:在多个线程竞争的情况下,由于编译器优化导致的指令重排或者其他线程可能释放资源,前一轮循环的环境状态已经发生变化了,所以在park线程前需要重新检查,尽量避免不必要的park和unpark。

该函数结束有两种情况:1. 当前线程得到了竞争的资源(p == head && tryAcquire(arg));2. 执行过程中发生了异常,这个异常会传递到上层的代码中。

下面来看一下shouldParkAfterFailedAcquire函数,可以形象的叫做找车位函数:

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL) // 之前已经有节点在排队等了,那我们也只能等了
        return true;
    if (ws > 0) {
        do {    // 前置节点是个cancel节点,那就需要寻找非cancel的第一个节点
            node.prev = pred = pred.prev;  //
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
         // waitStatus肯定是0或者propagate,先暂时不要返回true来block该线程,
         // 下一次循环会再次试图获取(tryAcquire),外循环“自旋”
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);// 只是标记一下pred为signal,无所谓成功与否,因为外面有个“自旋”
    }
    return false;
}

该函数其实是给node几点寻找真正要'park'的位置。总体逻辑就是寻找可以'park'的节点,然后把当前节点放在它后面。所谓可以park,就是该节点是个“等待被唤醒”的节点。如果没有这样的前置节点,即节点的status可能是0或者'PROPAGATE',这种情况,设置为signal节点,等待下一轮操作(在一个自旋代码块中)。

如果确实需要'park',则进入parkAndCheckInterrupt函数:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

这里,利用LockSupport提供的parkunpark来suspend和resume一个线程,代替Thread.suspendThread.resume,因为后者会产生死锁,已经被JDK1.7中标记为deprecated了。

流程图

aqs acquire流程图

几点说明

1.2 独占模式的释放资源

在上面的获取资源的代码中,我们可以知道:如果一个资源竞争得到资源了,它会被设置到头节点,因此,我们释放时,直接从head开始检查。代码如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

首先,调用子类的tryRelease做相关的释放资源的工作,然后从head开始(当前的线程所在的node),寻找需要被唤醒的下一个节点。如果head的waitStatus为0,则是最后一个节点了,不需要寻找了,直接退出。如果不为0,则可能存在需要唤醒的其他节点。

private void unparkSuccessor(Node node) {
    int ws = node.waitStatus;
    if (ws < 0) // 如果status小于0,设置为0,让其不要再参与了
        compareAndSetWaitStatus(node, ws, 0); // 独占模式无所谓失败(共享模式会在调用该函数之前确保设置成功)
    Node s = node.next;  // 一般下一个就是需要的唤醒的节点,这是next字段存在的最大意义
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)  // 寻找第一个非cancel的节点
            if (t.waitStatus <= 0)
                s = t;  // 注意,这里没有break,之前老以为是最后一个😅
    }
    if (s != null)
        LockSupport.unpark(s.thread);
}

基本逻辑,是寻找下一个要唤醒的节点;大部分情况,直接使用next就可以拿到。少部分情况,需要从队尾向前找。

unpark之后,当前线程恢复执行,从之前的block点:在acquireQueued的自旋循环里,开始。然后:如果是通过next找到的,则第一个if(p == head && tryAcquire(arg))中p == head是成立的,后面再次试图获取资源;如果是后面的某个节点,则通过后面的shouldParkAfterFailedAcquire继续。

1.3 独占类型acquire的其他变体

1.4 cancelAcquire函数

private void cancelAcquire(Node node) {
    if (node == null) // Ignore if node doesn't exist
        return;
    node.thread = null;
    // Skip cancelled predecessors
    Node pred = node.prev;
    while (pred.waitStatus > 0) //寻找合法的前缀节点
        node.prev = pred = pred.prev;
    Node predNext = pred.next;
    node.waitStatus = Node.CANCELLED;  // 标记为cancelled
    // If we are the tail, remove ourselves.
    if (node == tail && compareAndSetTail(node, pred)) { // 如果是tail直接删除
        compareAndSetNext(pred, predNext, null);
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        if (pred != head &&
            ((ws = pred.waitStatus) == Node.SIGNAL ||
             (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
            pred.thread != null) {
            Node next = node.next;
            if (next != null && next.waitStatus <= 0) // 将next节点链接到pred的next上,链表的删除操作
                compareAndSetNext(pred, predNext, next);
        } else { // 前缀是head节点,则要唤醒后面的节点
            unparkSuccessor(node);
        }
        node.next = node; // help GC
    }
}

这个函数会在acquire异常的时候触发,异常则来自于前面的两个函数acquireInterruptibly和tryAcquireNanos。实现逻辑也很简单:先找到合法前缀节点,标记节点为cancelled,然后:如果node是队尾,直接删除;如果是队列中第二个节点(pred是head),则唤醒后面的节点;其他情况,则把node的next节点链接到pred后面,其实就是删除该节点。

1.5 release操作

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

release操作非常简单:先调用子类的tryRelease做资源状态等的修改,然后唤醒下一个等待signal的线程。因为,持有资源的线程是head的next节点,所以从head开始寻找下一个有效节点即可。

1.6 独占类型总结

从上面的分析,可以看出next指针,用来快速的通知下一个要唤醒的节点,很大程度上是一种优化手段,并不依赖。至于,每个节点的waitStatus,我们这里只涉及到了为signal和0的情况,其他几种情况还没有碰到。分析源代码的时候,尽量两个函数作为一对,一起来分析,才能够更容易的理解。

2.1 共享类型的acquire操作

和独占模式一样:先调用子类的tryAcquireShared函数,如果失败,则入队等待。该函数忽略interrupt。

tryAcquireShared函数的返回值:
>0: 成功,还有剩余资源,后续线程还可以继续请求;
=0: 成功,没有剩余资源,后续资源不能请求;
<0: 失败;

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);  // 和独占模式不同的地方,可以多个节点设置head
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

可以看出和独占模式的逻辑基本类似,只有在成功获取资源时的处理不同,即函数setHeadAndPropagate

 private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    if (propagate > 0 || h == null || h.waitStatus < 0 || // waitStatus < 0,表示上次有节点释放过资源,
        (h = head) == null || h.waitStatus < 0) {  // 但是没有找到等待的节点(通过propagate标记过)
        Node s = node.next;
        if (s == null || s.isShared())
            doReleaseShared();
    }
}

其中,setHead和独占模式是一样的,标记当前正在占有资源的线程节点。不一样的地方在于这个propagate

propagate的意思是:当唤醒一个propagate节点时,需要继续唤醒其后续节点,将这种唤醒状态“传播”下去。propagate只会设置在head节点上(参见下面的doReleaseShared代码)。

2.2 共享模式的release操作

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {  // 唤醒在等待signal的后续节点
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // 标记为propagate,并跳过本轮,等待下一轮
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

类似独占模式,releaseShared先调用子类的tryRelease,如果成功,则调用上面的函数,来操作队列。

2.3 为什么需要PROPAGATE这个waitStatus

这是因为:唤醒等待节点,只会发生在head节点的waitStatus为signal时,这是独占和共享模式相同的原则;

但是,独占模式和共享模式不同的地方在于:共享模式,可以多个节点同时设置head,表示自己占有资源。这样便会发生head的waitStatus为0的情况,然后在release时,对于这种情况的处理方式就是做一个标记(propagate),表示有其他节点曾经释放过资源,如果下一次获取资源时,碰到这种waitsStatus的head时,就可以直接进行release,唤醒下一个等待signal的节点,而不是等到下一次显式的调用releaseShared。

3.1 对条件的支持

aqs提供了一个ConditionObject的类,来实现Lock接口,提供类似Object的wait和notify的操作,进而实现在一个锁上的条件等待和唤醒。
在其内部,aqs同样使用了一个队列,叫做条件队列。里面的每个节点的waitStatus为condition。我们不妨将前面的队列称为同步队列,以做区分。所有在同一个条件上等待的线程都同步在这个条件队列上,当条件达成时,这个条件队列的等待最久的那个(第一个)会被移到同步队列中,然后就和之前的独占模式一样了。

3.2 ConditionObject的await操作

public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();  // new 一个condition status node,并加入“条件队列”
    int savedState = fullyRelease(node);  // 释放资源,并保存之前资源状态
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) { // 自旋:block当前线程,直到被唤醒或者有中断
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)  // 再次竞争资源
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0) // 如果之前有中断,再次throw
        reportInterruptAfterWait(interruptMode);
}

该函数实现了对某个条件的等待,同时在等待时如果有中断发生时,会在结束时throw出来。具体步骤如下:

说明:这里的“中断”,是取消等待某个条件达成,不是取消竞争资源。所以,在checkInterruptWhileWaiting中会将该node的status设置为0,并加入同步队列,参与竞争。

3.3 ConditionObject的signal操作

private void doSignal(Node first) {   // 传入的参数是第一个等待条件的节点
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;   // 如果设置失败,证明该节点已经失效,被cancel了,直接返回false

    Node p = enq(node);
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) // 前置节点cancel或者设置失败,重新唤醒该线程
        LockSupport.unpark(node.thread);   // 让其进入自旋,参与竞争
    return true;
}

signal操作主要是上面两个函数实现:从条件队列的第一个节点,向后找到第一个能被成功移到同步队列的节点,同时根据情况唤醒该节点。

4. 总结

  本文从源代码实现的角度,分别分析了aqs框架的独占模式、共享模式以及条件是如何实现的。着重对其“获取”和“释放”进行了原理分析。对后面分析各种同步工具做了基础性的铺垫。