AQS实现原理介绍
前言
Java.util.concurrent(J.U.C)大大提高了并发性能,AQS是JUC的核心,是阻塞式锁相关的同步器工具的框架,是一个主要用来构建锁和同步器的抽象类。
一、AQS介绍
AQS 全程为 AbstractQueuedSynchronizer,它提供了一个 FIFO 队列,可以看成是一个用来实现同步锁及其它涉及到同步功能的核心组件,常见的有,ReentrantLock、CountDownLatch等
AQS是一个抽象类,主要通过继承的方式来使用,它本身没有实现任何的同步接口,仅仅是定义了同步状态的获取以及释放的方法来提高自定义的同步组件。
AQS 核心思想
- 如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。
- 如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
AQS的功能
从使用层面来说,AQS 的功能分为两种:独占和共享。
- 独占锁:每次只有一个线程持有锁,如:ReentrantLock 就是以独占方式实现的互斥锁
- 共享锁:允许多个线程同时获取锁 ,并发访问共享资源 , 如:ReentrantReadWriteLock
AQS底层使用了模板方法模式
使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放) 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
AQS使用了模板方法模式,自定义同步器时需要重写下面几个AQS提供的模板方法:
1 |
|
默认情况下,每个方法都抛出 UnsupportedOperationException。 这些方法的实现必须是内部线程安全的,并且通常应该简短而不是阻塞。AQS类中的其他方法都是final ,所以无法被其他类使用,只有这几个方法可以被其他类使用。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
AQS 的内部实现
AQS依赖内部的一个FIFO双向队列来完成同步状态的管理,当前线程获取锁失败时,AQS会将当前线程以及等待状态等信息构造成为一个节点(Node对象)并将其加入AQS中,同时会阻塞当前线程,当锁被释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
AQS中有一个头(head)节点和一个尾(tail)节点,中间每个节点(Node)都有一个prev和next指针指向前一个节点和后一个节点,如下图:
Node(AQS的内部类)对象组成
AQS中每一个节点就是一个Node对象,并且通过节点中的状态等信息来控制队列,Node对象是AbstractQueuedSynchronizer对象中的一个静态内部类,下面就是Node对象的源码:
1 |
|
内部状态说明
状态 | 说明 |
---|---|
static final int CANCELLED = 1 |
表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化。 |
static final int SIGNAL = -1 |
表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL。 |
static final int CONDITION = -2 |
表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁。 |
static final int PROPAGATE = -3 |
共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点。 |
volatile int waitStatus |
新结点入队时的默认状态0 |
负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常。
注意:Node对象并不是AQS才会使用,Condition队列以及其他工具中也会使用,所以有些状态和方法在这里是暂时用不上的本文就不会过多关注。
二、CLH队列锁
CLH队列锁是一种基于链表的可扩展,高性能,公平的自旋锁,申请资源的线程仅仅在本地变量上自旋,它不断轮询前驱节点的状态,假设发现前驱释放了锁,就结束自旋。
当一个线程需要获取锁时,先创建一个 QNode ,将其中的 locked 设置 true 表示需要获取锁, myPred 表示对其前驱结点的引用。
其完整流程如下所示:
- 假设线程A要获取资源,其先使自己成为队列的尾部,同时获取一个指向其前驱结点的引用
myPred
,并不断在父节点引用上自旋判断。
2.当另一个线程B同样也需要获取锁时,上述的过程同样也要来一遍,如下所示 (QNode-B):
3.当某个线程要释放锁时,就将当前节点的 locked
设置为 false
。
其后续节点因为不断在自旋,当判断到其前序节点 locked
为 false
,就表明其前序节点已经释放锁,其自身就可以获取到锁,并且释放当前前序节点引用,以便GC回收。
整个过程如上图所示,CLH队列锁的优点是空间复杂度低,如果有n个线程,L个锁,每个线程每次都只获取一个锁,那么其需要的存储空间 O(L+n) ,n个线程有n个 node,L 个锁有L个tail .
AQS 中的 CLH队列锁实现方式与上述方式相比是一种变体的实现,相比普通 CLH队列锁 ,AQS 中的实现方式做了相关的优化,比如不会不断重试,而会在重试相关次数后将线程阻塞。等待之后的唤醒。
三、AQS相关方法
模板方法
在我们实现自定义的同步组件时,将会调用同步器提供的模板方法。
相关方法如下:
上述模板方法同步器提供的模板方法分为3类:
- 独占式获取与释放同步状态
- 共享式获取与释放
- 同步状态和查询同步队列中的等待线程情况。
独占模式是只有一个线程能够访问资源,而共享模式可以允许多个线程访问资源
可重写的方法
访问或修改同步状态的方法
在自定义的同步组件框架中,AQS 抽象方法在实现过程中免不了要对同步状态 state 进行更改,这时就需要同步器提供的3个方法来进行操作,因为他们能够保证状态的改变是安全的:
- getState():获取当前同步状态
- setState(newState:Int):设置当前同步状态
- compareAndSetState(expect:Int,update:Int):使用 CAS 设置当前状态,该方法能保证状态设置的原子性。
四、自定义锁实现不可重入锁
自定义同步器
1 |
|
自定义锁
有了自定义同步器,很容易复用 AQS ,实现一个功能完备的自定义锁
1 |
|
测试代码
1 |
|
运行结果
1 |
|
从结果可以看出,成功使用MyLock对t1和t2加了锁,t2在t1解锁之后才继续执行。
同时实现的是不可重入锁,也就是同一个线程如果重复加锁也会发生阻塞,测试代码如下:
1 |
|
运行结果
1 |
|
可以看到relock并没有被打印出来,也就是第二次lock.lock();
并没加锁成功,而是同样被阻塞了。
五、基于AQS同步器实现的类
基于AQS同步器实现的类有很多,以下是其中一些常见的类:
- ReentrantLock:可重入互斥锁,可以防止死锁,支持公平和非公平性选择。
- ReentrantReadWriteLock:可重入读写锁,支持多个线程同时读取共享资源,在写操作时独占资源。
- CountDownLatch:一个同步工具类,用来协调多个线程之间的同步,可以让所有等待线程在某个事件完成后再同时执行。
- Semaphore:控制同时访问特定资源的线程数量,可以用来限制访问特定资源的线程数量。
- CyclicBarrier:也是一个同步工具类,它可以让多个线程在某个屏障处等待,直到所有线程都到达该处,然后再同时执行。
- Condition:一个条件变量,可以让线程挂起等待某个条件满足后再继续执行,通常与ReentrantLock一起使用。
这些类都是基于AQS同步器实现的,利用了AQS提供的底层同步机制来实现线程的协作与同步。