Java并发编程学习十四:AQS框架

一、AQS框架简介

AQS,即AbstractQueuedSynchronizer,在 ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch、ThreadPoolExcutor 的 Worker 中都有运用(JDK 1.8),AQS 是这些类的底层原理。

为什么要有AQS呢?是因为对于ReentrantLock、Semaphore等线程协作工具类而言,它们有很多工作是类似的,所以如果能把实现类似工作的代码给提取出来,变成一个新的底层工具类(或称为框架)的话,就可以直接使用这个工具类来构建上层代码了,而这个工具类其实就是 AQS。

AQS至少为线程协作工具类实现了以下内容:

  • 状态的原子性管理
  • 线程的阻塞与解除阻塞
  • 队列的管理

总而言之,AQS 是一个用于构建锁、同步器等线程协作工具类的框架,有了 AQS 以后,很多用于线程协作的工具类就都可以很方便的被写出来,有了 AQS 之后,可以让更上层的开发极大的减少工作量,避免重复造轮子,同时也避免了上层因处理不当而导致的线程安全问题,因为 AQS 把这些事情都做好了。

二、AQS原理简介

这里仅仅介绍AQS的三大部分:状态、队列和期望协作工具类去实现的获取/释放等重要方法,对于AQS 的源码分析,可以参考:源码解析

1. 状态

AQS想要去管理或者想作为协作工具类的一个基础框架,那么它必然要管理一些状态,而这个状态在 AQS 内部就是用 state 变量去表示的。

/**
* The synchronization state.
*/
private volatile int state;

这个state的含义,会根据具体实现类的作用不同而不同。

对于信号量Semaphore,state 表示的是剩余许可证的数量;对于CountDownLatch,state 表示的是需要“倒数”的数量;对于ReentrantLock ,state表示的是锁的占有情况(因为可重入,state是大于等0的正整数)。

如果需要基于AQS创建新的协作工具类,一定也需要利用 state,为这个类表示它所需要的业务逻辑和状态。

在AQS框架中,state是会被多个线程共享的,会被并发地修改,所以所有去修改 state 的方法都必须要保证 state 是线程安全的。但state的定义中,只有volatile 修饰,显然是不足以保证线程安全的。那么AQS如何保证修改时的线程安全呢?

以与state相关的方法 compareAndSetState 及 setState为例,它们的实现已经由 AQS 去完成,实际需要可以直接调用。

protected final boolean compareAndSetState(int expect, int update) {

    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

对于compareAndSetState而言,它利用了Unsafe 里面的 CAS 操作,利用 CPU 指令的原子性保证了这个操作的原子性

protected final void setState(int newState) {

    state = newState;
}

对于setState方法,并没有做其他并发安全的处理,原因在于,当对基本类型的变量进行直接赋值时,volatile 关键字可以确保线程安全。因为它不涉及读取之前的值,也不涉及在原来值的基础上再修改。

2. FIFO 队列

这个队列最主要的作用是存储等待的线程,在并发场景下,大部分的线程是抢不到的,此时就得需要有一个队列来存放、管理它们。AQS 的一大功能就是充当线程的“排队管理器”。

当多个线程去竞争同一把锁的时候,就需要用排队机制把那些没能拿到锁的线程串在一起;而当前面的线程释放锁之后,这个管理器就会挑选一个合适的线程来尝试抢刚刚释放的那把锁。AQS 一直在维护这个队列,并把等待的线程都放到队列里面。

file

队列内部是双向链表的形式,分别用 head 和 tail 来表示头节点和尾节点,两者在初始化的时候都指向了一个空节点。头节点可以理解为“当前持有锁的线程”,而在头节点之后的线程就被阻塞了,它们会等待被唤醒,唤醒也是由 AQS 负责操作的。

3. 获取/释放方法

这些方法是协作工具类的逻辑的具体体现,需要每一个协作工具类自己去实现,所以在不同的工具类中,它们的实现和含义各不相同。

a. 获取方法

获取操作通常会依赖 state 变量的值,根据 state 值不同,协作工具类也会有不同的逻辑,并且在获取的时候也经常会阻塞。

对于ReentrantLock ,lock方法就是其中一个“获取方法”,执行时,如果发现 state 不等于 0 且当前线程不是持有锁的线程,那么就代表这个锁已经被其他线程所持有了。这个时候,当然就获取不到锁,于是就让该线程进入阻塞状态。

对于Semaphore ,acquire 方法就是其中一个“获取方法”,作用是获取许可证,此时能不能获取到这个许可证也取决于 state 的值。如果 state 值是正数,那么代表还有剩余的许可证,数量足够的话,就可以成功获取;但如果 state 是 0,则代表已经没有更多的空余许可证了,此时这个线程就获取不到许可证,会进入阻塞状态,

对于CountDownLatch ,await 方法(包含重载方法)是一个“获取方法”,作用是“等待,直到倒数结束”。执行 await 的时候会判断 state 的值,如果 state 不等于 0,线程就陷入阻塞状态,直到其他线程执行倒数方法把 state 减为 0,此时就代表现在这个门闩放开了,所以之前阻塞的线程就会被唤醒。

b. 释放方法

释放方法是站在获取方法的对立面的,通常和刚才的获取方法配合使用,但是释放方法通常是不会阻塞线程的。

比如Semaphore 中的release 方法(包含重载方法),作用是去释放一个许可证,会让 state 加 1;再比如CountDownLatch 中的countDown 方法,作用是倒数一个数,让 state 减 1。

三、CountDownLatch中AQS的使用

1. 利用AQS自定义线程协作工具类

要想使用AQS自定义线程协作工具类,需要分为以下三步:

  • 新建一个自己的线程协作工具类,在内部写一个 Sync 类,该 Sync 类继承 AbstractQueuedSynchronizer,即 AQS;
  • 想好设计的线程协作工具类的协作逻辑,在 Sync 类里,根据是否是独占,来重写对应的方法。如果是独占,则重写 tryAcquire 和 tryRelease 等方法;如果是非独占,则重写 tryAcquireShared 和 tryReleaseShared 等方法;
  • 在自己的线程协作工具类中,实现获取/释放的相关方法,并在里面调用 AQS 对应的方法,如果是独占则调用 acquire 或 release 等方法,非独占则调用 acquireShared 或 releaseShared 或 acquireSharedInterruptibly 等方法。

这里有个注意点,根据某些条件来重写特定的一部分方法,为什么不通过实现接口的方法?这样自然就知道需要重写其中哪些方法了。这里确实采用的继承AbstractQueuedSynchronizer,然后自己去判断选择哪些方法进行重写。

原因在于,如果是实现接口的话,那每一个抽象方法都需要实现。但是实际上并不是每个方法都需要重写,根据需求的不同,有选择的去实现一部分就足以了,所以就设计为不采用实现接口,而采用继承类并重写方法的形式。

那问题又来了,继承之后不强制要求重写方法的,可不可以一个方法都不重写呢?

答案是,不行,因为在执行的时候会抛出异常。AQS内部的 tryAcquire、tryRelease、tryAcquireShared 和 tryReleaseShared 方法内部只有一行实现代码,就是直接抛出异常。

protected boolean tryAcquire(int arg) {

    throw new UnsupportedOperationException();
}

protected boolean tryRelease(int arg) {

    throw new UnsupportedOperationException();
}

protected int tryAcquireShared(int arg) {

  throw new UnsupportedOperationException();
}

protected boolean tryReleaseShared(int arg) {

    throw new UnsupportedOperationException();
}

2. AQS 在 CountDownLatch 的应用

对应于上面的三个步骤,一起来看看CountDownLatch 如何使用AQS的。

首先看源码

public class CountDownLatch {

    /**
     * Synchronization control For CountDownLatch.
     * Uses AQS state to represent count.
     */

    private static final class Sync extends AbstractQueuedSynchronizer {

        private static final long serialVersionUID = 4982264981922014374L;
        Sync(int count) {

            setState(count);
        }
        int getCount() {

            return getState();
        }
        protected int tryAcquireShared(int acquires) {

            return (getState() == 0) ? 1 : -1;
        }
        protected boolean tryReleaseShared(int releases) {

            // Decrement count; signal when transition to zero
            for (;;) {

                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }
    private final Sync sync;
   //省略其他代码...
}

可以看到,CountDownLatch 中有一个内部类 Sync 类继承了 AQS,在 CountDownLatch 里面还有一个 sync 的变量,正是 Sync 类的一个对象。同时,Sync 不但继承了 AQS 类,而且还重写了 tryAcquireShared 和 tryReleaseShared 方法。这里可以对应上述步骤的第一步和第二步。

接下来看看CountDownLatch 的4个最重要的方法:

a. 构造函数

对于CountDownLatch,构造函数只有一个,入的参数是需要“倒数”的次数

public CountDownLatch(int count) {

    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

当count < 0 时会抛出异常,当 count > = 0,即代码 this.sync = new Sync( count ) ,往 Sync 中传入了 count,这个里的 Sync 的构造方法如下:

Sync(int count) {

     setState(count);
}

该构造函数调用了 AQS 的 setState 方法,并且把 count 传进去了,而 setState 正是给 AQS 中的 state 变量赋值的

protected final void setState(int newState) {

    state = newState;
}

因此,通过 CountDownLatch 构造函数将传入的 count 最终传递到 AQS 内部的 state 变量,给 state 赋值

b. getCount方法

该方法作用是获取当前剩余的还需要“倒数”的数量。

public long getCount() {

     return sync.getCount();
}

该方法return 的是 sync 的 getCount:

int getCount() {

     return getState();
}

继续追踪getCount 方法

protected final int getState() {

    return state;
}

protected final int getState 方法直接 return 的就是 state 的值,所以最终它获取到的就在 AQS 中 state 变量的值。

c. countDown方法

该方法其实就是 CountDownLatch 的“释放”方法

public void countDown() {

    sync.releaseShared(1);
}

countDown 方法中调用的是 sync 的 releaseShared 方法

public final boolean releaseShared(int arg) {

    if (tryReleaseShared(arg)) {

        doReleaseShared();
        return true;
    }
    return false;
}

releaseShared 先进行 if 判断,判断 tryReleaseShared 方法的返回结果,tryReleaseShared 源码如下所示 :

protected boolean tryReleaseShared(int releases) {

    // Decrement count; signal when transition to zero
    for (;;) {

        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}

该方法的逻辑是:

  • for 的死循环内,先通过 getState 拿到当前 state 的值并赋值给变量 c,如果此时 c = 0,则意味着已经倒数为零了,直接 return false ,再往上看上一层的 releaseShared 方法,就会直接跳过整个 if (tryReleaseShared(arg)) 代码块,直接返回 false,相当于 releaseShared 方法不产生效果,也就意味着 countDown 方法不产生效果。
  • 如果 c 不等于 0,在这里会先把 c-1 的值赋给 nextc,然后再利用 CAS 尝试把 nextc 赋值到 state 上。如果赋值成功就代表本次 countDown 方法操作成功,也就意味着把 AQS 内部的 state 值减了 1
  • 接下来是return nextc == 0,如果 nextc 为 0,意味着本次倒数后恰好达到了规定的倒数次数,门闩应当在此时打开,所以 tryReleaseShared 方法会返回 true,那么再回到之前的 releaseShared 方法中,可以看到,接下来会调用 doReleaseShared 方法,效果是对之前阻塞的线程进行唤醒,让它们继续执行

d. await方法

该方法是 CountDownLatch 的“获取”方法,调用 await 方法会把线程阻塞,直到倒数为 0 才能继续执行。

public void await() throws InterruptedException {

    sync.acquireSharedInterruptibly(1);
}

它会调用 sync 的 acquireSharedInterruptibly ,并且传入 1

 public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {

    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

上述方法中,除了对于中断的处理之外,比较重要的就是 tryAcquireShared 方法。这个方法很简单,它会直接判断 getState 的值是不是等于 0,如果等于 0 就返回 1,不等于 0 则返回 -1。

protected int tryAcquireShared(int acquires) {

    return (getState() == 0) ? 1 : -1;
}

如果tryAcquireShared返回1,即“倒数”的数量不为0,if (tryAcquireShared(arg) < 0)为true,执行 doAcquireSharedInterruptibly 方法,然后会让线程进入阻塞状态。如果tryAcquireShared返回-1,即“倒数”的数量为0,意味着门闩是打开状态,await 方法会立刻返回,此时线程就不会进入阻塞状态。