AQS实现原理介绍

前言

Java.util.concurrent(J.U.C)大大提高了并发性能,AQS是JUC的核心,是阻塞式锁相关的同步器工具的框架,是一个主要用来构建锁和同步器的抽象类

一、AQS介绍

AQS 全程为 AbstractQueuedSynchronizer,它提供了一个 FIFO 队列,可以看成是一个用来实现同步锁及其它涉及到同步功能的核心组件,常见的有,ReentrantLock、CountDownLatch等

AQS是一个抽象类,主要通过继承的方式来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提高自定义的同步组件。

AQS 核心思想

  • 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。
  • 如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中

AQS的功能

从使用层面来说,AQS 的功能分为两种:独占和共享。

  • 独占锁:每次只有一个线程持有锁,如:ReentrantLock 就是以独占方式实现的互斥锁
  • 共享锁:允许多个线程同时获取锁 ,并发访问共享资源 , 如:ReentrantReadWriteLock

AQS底层使用了模板方法模式

使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放) 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。

AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:

1
2
3
4
5
isHeldExclusively()//该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int)//独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int)//独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int)//共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int)//共享方式。尝试释放资源,成功则返回true,失败则返回false。

默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。

AQS 的内部实现

AQS依赖内部的一个FIFO双向队列来完成同步状态的管理,当前线程获取锁失败时,AQS会将当前线程以及等待状态等信息构造成为一个节点(Node对象)并将其加入AQS中,同时会阻塞当前线程,当锁被释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

AQS中有一个头(head)节点和一个尾(tail)节点,中间每个节点(Node)都有一个prev和next指针指向前一个节点和后一个节点,如下图:

image-20230330231415999

Node(AQS的内部类)对象组成

AQS中每一个节点就是一个Node对象,并且通过节点中的状态等信息来控制队列,Node对象是AbstractQueuedSynchronizer对象中的一个静态内部类,下面就是Node对象的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
static final class Node {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;//表示当前线程状态是取消的
static final int SIGNAL = -1;//表示当前线程正在等待锁
static final int CONDITION = -2;//Condition队列有使用到,暂时用不到
static final int PROPAGATE = -3;//CountDownLatch等工具中使用到,暂时用不到
volatile int waitStatus;//节点中线程的状态,默认为0
volatile Node prev;//当前节点的前一个节点
volatile Node next;//当前节点的后一个节点
volatile Thread thread;//当前节点封装的线程信息
Node nextWaiter;//Condition队列中的关系,暂时用不到
final boolean isShared() {//暂时用不到
return nextWaiter == SHARED;
}

final Node predecessor() throws NullPointerException {//获取当前节点的上一个节点
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() {
}
Node(Thread thread, Node mode) {//构造一个节点:addWaiter方法中会使用,此时waitStatus默认等于0
this.nextWaiter = mode;
this.thread = thread;
}

Node(Thread thread, int waitStatus) { //构造一个节点:Condition中会使用
this.waitStatus = waitStatus;
this.thread = thread;
}
}

内部状态说明

状态 说明
static final int CANCELLED = 1 表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。
static final int SIGNAL = -1 表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。
static final int CONDITION = -2 表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。
static final int PROPAGATE = -3 共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。
volatile int waitStatus 新结点入队时的默认状态0

负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。

注意:Node对象并不是AQS才会使用,Condition队列以及其他工具中也会使用,所以有些状态和方法在这里是暂时用不上的本文就不会过多关注。

二、CLH队列锁

CLH队列锁是一种基于链表的可扩展,高性能,公平的自旋锁,申请资源的线程仅仅在本地变量上自旋,它不断轮询前驱节点的状态,假设发现前驱释放了锁,就结束自旋。

当一个线程需要获取锁时,先创建一个 QNode ,将其中的 locked 设置 true 表示需要获取锁, myPred 表示对其前驱结点的引用。

image-20230330231433272

其完整流程如下所示

  1. 假设线程A要获取资源,其先使自己成为队列的尾部,同时获取一个指向其前驱结点的引用 myPred,并不断在父节点引用上自旋判断。

image-20230330231456230

2.当另一个线程B同样也需要获取锁时,上述的过程同样也要来一遍,如下所示 (QNode-B):

image-20230330231515623

3.当某个线程要释放锁时,就将当前节点的 locked 设置为 false

其后续节点因为不断在自旋,当判断到其前序节点 lockedfalse ,就表明其前序节点已经释放锁,其自身就可以获取到锁,并且释放当前前序节点引用,以便GC回收。

image-20230330231529926

整个过程如上图所示,CLH队列锁的优点是空间复杂度低,如果有n个线程,L个锁,每个线程每次都只获取一个锁,那么其需要的存储空间 O(L+n) ,n个线程有n个 node,L 个锁有L个tail .

AQS 中的 CLH队列锁实现方式与上述方式相比是一种变体的实现,相比普通 CLH队列锁 ,AQS 中的实现方式做了相关的优化,比如不会不断重试,而会在重试相关次数后将线程阻塞。等待之后的唤醒。

三、AQS相关方法

模板方法

在我们实现自定义的同步组件时,将会调用同步器提供的模板方法。

相关方法如下

image-20230330231545855

上述模板方法同步器提供的模板方法分为3类:

  1. 独占式获取与释放同步状态
  2. 共享式获取与释放
  3. 同步状态和查询同步队列中的等待线程情况。

独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源

可重写的方法

image-20230330231559309

访问或修改同步状态的方法

在自定义的同步组件框架中,AQS 抽象方法在实现过程中免不了要对同步状态 state 进行更改,这时就需要同步器提供的3个方法来进行操作,因为他们能够保证状态的改变是安全的:

  • getState():获取当前同步状态
  • setState(newState:Int):设置当前同步状态
  • compareAndSetState(expect:Int,update:Int):使用 CAS 设置当前状态,该方法能保证状态设置的原子性。

四、自定义锁实现不可重入锁

自定义同步器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//独占锁,同步器类
class MySync extends AbstractQueuedSynchronizer {

@Override
protected boolean tryAcquire(int arg) {
if (compareAndSetState(0,1)) {
//加锁成功,并设置owner为当前线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

@Override
protected boolean tryRelease(int arg) {
setExclusiveOwnerThread(null);
setState(0); //state属性是volatile的,它具有写屏障可以保证它之前的操作对其他线程可见
return true;
}

//是否持有独占
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}

protected Condition newCondition(){
return new ConditionObject();
}
}

自定义锁

有了自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
final class MyLock implements Lock {

private MySync sync = new MySync();

//加锁(不成功会进入等待队列等待)
@Override
public void lock() {
sync.acquire(1);
}

//加锁,可打断
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

//尝试加锁(一次)
@Override
public boolean tryLock() {
return sync.tryAcquire(1);
}

//有时限的尝试加锁
@Override
public boolean tryLock(long time, @NotNull TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(time));
}

//解锁
@Override
public void unlock() {
sync.release(1);
}

//新建环境变量
@NotNull
@Override
public Condition newCondition() {
return sync.newCondition();
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
sleep(1);
} finally {
log.debug("unlocking...");
lock.unlock();
}
},"t1").start();
new Thread(() -> {
lock.lock();
try {
log.debug("locking...");
} finally {
log.debug("unlocking...");
lock.unlock();
}
},"t2").start();
}

运行结果

1
2
3
4
14:37:31.020 [t1] DEBUG c.ThreadPoolDemo10 - locking...
14:37:32.024 [t1] DEBUG c.ThreadPoolDemo10 - unlocking...
14:37:32.024 [t2] DEBUG c.ThreadPoolDemo10 - locking...
14:37:32.024 [t2] DEBUG c.ThreadPoolDemo10 - unlocking...

从结果可以看出,成功使用MyLock对t1和t2加了锁,t2在t1解锁之后才继续执行。

同时实现的是不可重入锁,也就是同一个线程如果重复加锁也会发生阻塞,测试代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
MyLock lock = new MyLock();
new Thread(()->{
lock.lock();
log.debug("locking...");
lock.lock();
log.debug("relock...");

log.debug("unlocking...");
lock.unlock();
}, "t1").start();
}

运行结果

1
14:43:21.980 [t1] DEBUG c.ThreadPoolDemo10 - locking...

可以看到relock并没有被打印出来,也就是第二次lock.lock();并没加锁成功,而是同样被阻塞了。

五、基于AQS同步器实现的类

基于AQS同步器实现的类有很多,以下是其中一些常见的类:

  1. ReentrantLock:可重入互斥锁,可以防止死锁,支持公平和非公平性选择。
  2. ReentrantReadWriteLock:可重入读写锁,支持多个线程同时读取共享资源,在写操作时独占资源。
  3. CountDownLatch:一个同步工具类,用来协调多个线程之间的同步,可以让所有等待线程在某个事件完成后再同时执行。
  4. Semaphore:控制同时访问特定资源的线程数量,可以用来限制访问特定资源的线程数量。
  5. CyclicBarrier:也是一个同步工具类,它可以让多个线程在某个屏障处等待,直到所有线程都到达该处,然后再同时执行。
  6. Condition:一个条件变量,可以让线程挂起等待某个条件满足后再继续执行,通常与ReentrantLock一起使用。

这些类都是基于AQS同步器实现的,利用了AQS提供的底层同步机制来实现线程的协作与同步。


AQS实现原理介绍
http://example.com/2023/03/30/AQS实现原理介绍/
作者
程序员小魏
发布于
2023年3月30日
许可协议