ReentrantReadWriteLock介绍、应用和原理

基本特性

一、概述

ReentrantLock是独占锁,某一时刻只有一个线程可以获取该锁,而实际上会存在很多读多写少的场景,而读操作本身并不会存在数据竞争问题,如果使用独占锁,可能会导致其中一个读线程使其他的读线程陷入等待,降低性能。

针对这种读多写少的场景,读写锁应运而生。读写锁允许同一时刻有多个读线程访问,但在写线程访问时,所有的读线程和其他写线程均被阻塞。

JUC包中的读写锁接口为ReadWriteLock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();//返回读锁

/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();//返回写锁
}

读写锁其实就是维护了一对锁,一个写锁一个读锁,通过读写分离的策略,允许多个线程同时获取读锁,大大提高并发性。

特点

  • 读写锁的内部包含两把锁:
    • 一把是读(操作)锁,是一种共享锁;
    • 另一把是写(操作)锁,是一种独占锁。
  • 在没有写锁的时候,读锁可以被多个线程同时持有;
  • 写锁被一个线程持有,其他的线程不能再持有写锁,抢占写锁会阻塞,抢占读锁也会阻塞。

功能

  • 支持公平和非公平的获取锁的方式。
  • 支持可重入:读线程在获取读锁后还可以获得读锁,写线程获取写锁后可以再次获得写锁或者读锁。
  • 允许从写入锁降级为读取锁:先获取写锁,再获取读锁,最后释放写锁,不允许从读锁升级到写锁。
  • 读锁和写锁都支持锁获取期间的中断。
  • Condition支持:写入锁提供一个Condition实现,读取锁不支持Condition(抛出UnsupportedOperationException)。

二、类图分析

image-20230331150816032

1. ReentrantReadWriteLock实现了ReadWritrLock接口

源码为上方概述部分JUC包中的读写锁接口

2. FairSync、NonfairSync继承Sync类,提供了公平和非公平的实现

1
2
3
4
5
6
7
8
9
static final class NonfairSync extends Sync{

}
static final class FairSync extends Sync {

}
abstract static class Sync extends AbstractQueuedSynchronizer{

}

3. WriteLock、Sync、ReadLock、FairSync、NonfaruSyns都是ReadWritrLock的静态内部类

AQS 中只维护了一个 state 状态,而 ReentrantReadWriteLock 则需要维护读状态和写状态

1
2
3
public abstract class AbstractQueuedSynchronizer{
private volatile int state;//state是int类型 32位
}

用 state 的高16 位表示读状态,也就是获取到读锁的次数;使用低 16 位表示获取到写锁的线程的可重入次数 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static final int SHARED_SHIFT   = 16;

//共享锁(读锁)状态单位值 65536 1<<16 2^16
static final int SHARED_UNIT = (1 << SHARED_SHIFT);

//共享锁线程最大个数 65535
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;

//排它锁(写锁)掩码,二进制,15 个 1
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

//返回读锁线程数
/** Returns the number of shared holds represented in count */
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

//返回写锁可重入个数
/** Returns the number of exclusive holds represented in count */
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

//firstReader 用来记录第一个获取到读锁的线程
private transient Thread firstReader = null;
//firstReaderHoldCount 则记录第 一个获取到读锁的线程获取读锁的可重入次数
private transient int firstReaderHoldCount;

//cachedHoldCounter 用来记录最后 一个获取读锁的线程获取读锁 的可重入次数
private transient HoldCounter cachedHoldCounter;

该部分,后面将进行详细介绍。

Sync类继承AQS,它的内部也存在两个内部类,分别为HoldCounterThreadLocalHoldCounter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//计数器,主要与读锁配套使用
static final class HoldCounter {
//计数
int count = 0;
// Use id, not reference, to avoid garbage retention
//获取当前线程的TID属性,该字段可以用来唯一标识一个线程
final long tid = getThreadId(Thread.currentThread());
}

//本地线程计数器
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
// 重写初始化方法,在没有进行set的情况下,获取的都是该HoldCounter值
public HoldCounter initialValue() {
return new HoldCounter();
}
}

三、读写状态设计

同步状态在前面重入锁的实现中是表示被同一个线程重复获取的次数,即一个整形变量来维护,但是之前的那个表示仅仅表示是否锁定,而不用区分是读锁还是写锁。而读写锁需要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态。

读写锁对于同步状态的实现是在一个整形变量上通过“按位切割使用”:将变量切割成两部分,高16位表示读,低16位表示写

image-20230331151037137

例如当前同步状态值为S,则读写状态的获取与操作如下所示

  • 获取写状态

    S&0x0000FFFF,即将高16位全部抹去

  • 获取读状态

    S>>>16:无符号补0,右移16位

  • 写状态加1

    S+1

  • 读状态加1

    **S+(1<<16)**:即S+0x00010000

在代码层面的判断中,如果S不等于0,当写状态(S&0x0000FFFF)等于0,而读状态(S>>>16)大于0,则表示该读写锁的读锁已被获取。

四、使用

提供一个数据容器类内部分别使用读锁保护数据的 read() 方法,写锁保护数据的 write() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j(topic = "c.DataContainer")
class DataContainer {
private Object data;
private ReentrantReadWriteLock rw = new ReentrantReadWriteLock();
private ReentrantReadWriteLock.ReadLock r = rw.readLock();
private ReentrantReadWriteLock.WriteLock w = rw.writeLock();
public Object read() {
log.debug("获取读锁...");
r.lock();
try {
log.debug("读取");
sleep(1);
return data;
} finally {
log.debug("释放读锁...");
r.unlock();
}
}
public void write() {
log.debug("获取写锁...");
w.lock();
try {
log.debug("写入");
sleep(1);
} finally {
log.debug("释放写锁...");
w.unlock();
}
}
}
  • 测试 读锁-读锁 可以并发
1
2
3
4
5
6
7
8
9
public static void main(String[] args) {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
new Thread(() -> {
dataContainer.read();
}, "t2").start();
}

运行结果

1
2
3
4
5
6
12:08:41.198 [t1] DEBUG c.DataContainer - 获取读锁...
12:08:41.198 [t2] DEBUG c.DataContainer - 获取读锁...
12:08:41.200 [t1] DEBUG c.DataContainer - 读取
12:08:41.201 [t2] DEBUG c.DataContainer - 读取
12:08:42.203 [t2] DEBUG c.DataContainer - 释放读锁...
12:08:42.203 [t1] DEBUG c.DataContainer - 释放读锁...

从输出结果可以看到 t1 线程锁定期间,t2 线程的读操作不受影响

  • 测试 读锁-写锁 相互阻塞
1
2
3
4
5
6
7
8
9
10
public static void main(String[] args) {
DataContainer dataContainer = new DataContainer();
new Thread(() -> {
dataContainer.read();
}, "t1").start();
sleep(0.1);
new Thread(() -> {
dataContainer.write();
}, "t2").start();
}

运行结果

1
2
3
4
5
6
12:10:50.689 [t1] DEBUG c.DataContainer - 获取读锁...
12:10:50.691 [t1] DEBUG c.DataContainer - 读取
12:10:50.788 [t2] DEBUG c.DataContainer - 获取写锁...
12:10:51.692 [t1] DEBUG c.DataContainer - 释放读锁...
12:10:51.692 [t2] DEBUG c.DataContainer - 写入
12:10:52.694 [t2] DEBUG c.DataContainer - 释放写锁...

从结果可以看出,当读锁被释放之后,写锁才能开始执行写入操作,即读锁和写锁产生了互斥。

  • 写锁-写锁 也是相互阻塞的,这里就不测试了

注意事项

  • 读锁不支持条件变量
  • 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
1
2
3
4
5
6
7
8
9
10
11
12
readLock.lock();
try {
// ...
writeLock.lock();
try {
// ...
} finally{
writeLock.unlock();
}
} finally{
readLock.unlock();
}
  • 重入时降级支持:即持有写锁的情况下去获取读锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
class CachedData {
Object data;
// 是否有效,如果失效,需要重新计算 data
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// 获取写锁前必须释放读锁
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// 判断是否有其它线程已经获取了写锁、更新了缓存, 避免重复更新
if (!cacheValid) {
data = ...
cacheValid = true;
}
// 降级为读锁, 释放写锁, 这样能够让其它线程读取缓存
rwl.readLock().lock();
} finally {

rwl.writeLock().unlock();
}
}
// 自己用完数据, 释放读锁
//避免直接释放写锁后,其他线程对于数据的更新对当前线程不可见
try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}

锁降级是指把持住写锁,再获取读锁,随后释放写锁的过程。

经过锁降级之后,写锁就会被降级为读锁。

之所以在释放写锁之前需要先获取读锁,是为了避免直接释放写锁后,其他线程获取到写锁来修改当前线程的数据,造成当前线程获取到读锁后,读取错误数据。

五、基本特性总结

读写互斥原则

  1. 读读相容
  2. 读写互斥
  3. 写写互斥

注意事项

1、读锁不支持条件变量

2、重入时升级不支持,即当前线程如果持有读锁,则在重入获得锁时不能在获得写锁

3、重入支持降级,即当前线程如果持有写锁,则在重入时可以获得读锁

缓存之应用

一、缓存更新策略

更新时,是先清缓存还是先更新数据库

先清缓存,再更新数据库

image-20230331151209367

结果:造成查询的值和数据库中的值不一致

先更新数据库,再清除缓存

image-20230331151239317

结果:造成A线程首次查询和后续查询得到不一致的结果,首次查询得到 x=1,后续查询发现已经清空了缓存,需要去数据库中查得 x=2

补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询

image-20230331151254228

这种情况的出现几率非常小

总结

  • 先清缓存:可能造成刚清理缓存还没有更新数据库,高并发下,其他线程直接查询了数据库过期数据到缓存中,这种情况非常严重,直接导致后续所有的请求缓存和数据库不一致。
  • 先更新据库:可能造成刚更新数据库,还没清空缓存就有线程从缓存拿到了旧数据,这种情况概率比较小,影响范围有限,只对这一次的查询结果有问题。

显而易见,通常情况下,先更新数据库,然后清空缓存。

二、读写锁实现一致性缓存

使用读写锁实现一个简单的按需加载缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class GenericCachedDao<T> {
// HashMap 作为缓存非线程安全, 需要保护
HashMap<SqlPair, T> map = new HashMap<>();
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
GenericDao genericDao = new GenericDao();

public int update(String sql, Object... params) {
SqlPair key = new SqlPair(sql, params);
// 加写锁, 防止其它线程对缓存读取和更改
lock.writeLock().lock();
try {
int rows = genericDao.update(sql, params);
map.clear();
return rows;
} finally {
lock.writeLock().unlock();
}
}

public T queryOne(Class<T> beanClass, String sql, Object... params) {
SqlPair key = new SqlPair(sql, params);
// 加读锁, 防止其它线程对缓存更改
lock.readLock().lock();
try {
T value = map.get(key);
if (value != null) {
return value;
}
} finally {
lock.readLock().unlock();
}
// 加写锁, 防止其它线程对缓存读取和更改
lock.writeLock().lock();
try {
// get 方法上面部分是可能多个线程进来的, 可能已经向缓存填充了数据
// 为防止重复查询数据库, 再次验证
T value = map.get(key);
if (value == null) {
// 如果没有, 查询数据库
value = genericDao.queryOne(beanClass, sql, params);
map.put(key, value);
}
return value;
} finally {
lock.writeLock().unlock();
}
}

// 作为 key 保证其是不可变的
class SqlPair {
private String sql;
private Object[] params;

public SqlPair(String sql, Object[] params) {
this.sql = sql;
this.params = params;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SqlPair sqlPair = (SqlPair) o;
return sql.equals(sqlPair.sql) &&
Arrays.equals(params, sqlPair.params);
}

@Override
public int hashCode() {
int result = Objects.hash(sql);
result = 31 * result + Arrays.hashCode(params);
return result;
}
}
}

注意

  • 以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑
    • 适合读多写少,如果写操作比较频繁,以上实现性能低
    • 没有考虑缓存容量
    • 没有考虑缓存过期
    • 只适合单机
    • 并发性还是低,目前只会用一把锁
    • 更新方法太过简单粗暴,清空了所有 key(考虑按类型分区或重新设计 key)
  • 乐观锁实现:用 CAS 去更新

原理

一、ReentrantReadWriteLock简单流程

1.1 独占获取锁简单流程

image-20230331152254963

获取写锁流程

  1. 独占锁获取(writeLock写锁),首先判断是否有线程获取了锁,是否有线程获取了锁的判断通过读写锁中通过32位int类型state可以获取,其中低16位表示读锁,高16表示写锁。
  2. 有读锁:直接排队阻塞。
  3. 有写锁:还需要判断写锁线程是否是自己,如果是自己就是锁重入了,如果不是自己说明已经有其他的线程获取锁正在执行,那么当前线程需要排队阻塞。
  4. 无锁:直接获取锁,其他抢占的独占锁线程需要排队阻塞,当前线程执行完毕后释放锁通知下一个排队线程获取锁。

1.2 共享获取锁简单流程

image-20230331152307440

读锁获取锁流程

  1. 共享锁获取(readLock读锁),首先判断是否有线程获取了锁。
  2. 有读锁:当前线程发现此时读锁状态被占用,说明有线程获取了读锁。该线程通过cas自旋【死循环】获取到读锁为止。
  3. 有写锁:还需要判断持有写锁的线程是否是自己,如果是自己而且此时获取的是读锁会获取锁成功,我们称为锁降级,如果不是自己说明此时有其他线程获取了写锁,那么当前线程需要排队阻塞。
  4. 无锁:直接获取锁。

ReentrantReadWriteLock介绍、应用和原理
http://example.com/2023/03/31/ReentrantReadWriteLock介绍、应用和原理/
作者
程序员小魏
发布于
2023年3月31日
许可协议