基本特性
一、概述
ReentrantLock
是独占锁,某一时刻只有一个线程可以获取该锁,而实际上会存在很多读多写少的场景,而读操作本身并不会存在数据竞争问题,如果使用独占锁,可能会导致其中一个读线程使其他的读线程陷入等待,降低性能。
针对这种读多写少的场景,读写锁应运而生。读写锁允许同一时刻有多个读线程访问,但在写线程访问时,所有的读线程和其他写线程均被阻塞。
JUC包中的读写锁接口为ReadWriteLock
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public interface ReadWriteLock {
Lock readLock();
Lock writeLock(); }
|
读写锁其实就是维护了一对锁,一个写锁一个读锁,通过读写分离的策略,允许多个线程同时获取读锁,大大提高并发性。
特点
- 读写锁的内部包含两把锁:
- 一把是读(操作)锁,是一种共享锁;
- 另一把是写(操作)锁,是一种独占锁。
- 在没有写锁的时候,读锁可以被多个线程同时持有;
- 写锁被一个线程持有,其他的线程不能再持有写锁,抢占写锁会阻塞,抢占读锁也会阻塞。
功能
- 支持公平和非公平的获取锁的方式。
- 支持可重入:读线程在获取读锁后还可以获得读锁,写线程获取写锁后可以再次获得写锁或者读锁。
- 允许从写入锁降级为读取锁:先获取写锁,再获取读锁,最后释放写锁,不允许从读锁升级到写锁。
- 读锁和写锁都支持锁获取期间的中断。
- Condition支持:写入锁提供一个Condition实现,读取锁不支持Condition(抛出
UnsupportedOperationException
)。
二、类图分析
1. ReentrantReadWriteLock实现了ReadWritrLock接口
源码为上方概述部分JUC包中的读写锁接口
2. FairSync、NonfairSync继承Sync类,提供了公平和非公平的实现
1 2 3 4 5 6 7 8 9
| static final class NonfairSync extends Sync{
} static final class FairSync extends Sync {
} abstract static class Sync extends AbstractQueuedSynchronizer{
}
|
3. WriteLock、Sync、ReadLock、FairSync、NonfaruSyns都是ReadWritrLock的静态内部类
AQS 中只维护了一个 state 状态,而 ReentrantReadWriteLock
则需要维护读状态和写状态
1 2 3
| public abstract class AbstractQueuedSynchronizer{ private volatile int state; }
|
用 state 的高16 位表示读状态,也就是获取到读锁的次数;使用低 16 位表示获取到写锁的线程的可重入次数 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;
private transient HoldCounter cachedHoldCounter;
|
该部分,后面将进行详细介绍。
Sync类继承AQS,它的内部也存在两个内部类,分别为HoldCounter
和ThreadLocalHoldCounter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| static final class HoldCounter { int count = 0; final long tid = getThreadId(Thread.currentThread()); }
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
|
三、读写状态设计
同步状态在前面重入锁的实现中是表示被同一个线程重复获取的次数,即一个整形变量来维护,但是之前的那个表示仅仅表示是否锁定,而不用区分是读锁还是写锁。而读写锁需要在同步状态(一个整形变量)上维护多个读线程和一个写线程的状态。
读写锁对于同步状态的实现是在一个整形变量上通过“按位切割使用”:将变量切割成两部分,高16位表示读,低16位表示写。
例如当前同步状态值为S,则读写状态的获取与操作如下所示
在代码层面的判断中,如果S不等于0,当写状态(S&0x0000FFFF)等于0,而读状态(S>>>16)大于0,则表示该读写锁的读锁已被获取。
四、使用
提供一个数据容器类内部分别使用读锁保护数据的 read()
方法,写锁保护数据的 write()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| @Slf4j(topic = "c.DataContainer") class DataContainer { private Object data; private ReentrantReadWriteLock rw = new ReentrantReadWriteLock(); private ReentrantReadWriteLock.ReadLock r = rw.readLock(); private ReentrantReadWriteLock.WriteLock w = rw.writeLock(); public Object read() { log.debug("获取读锁..."); r.lock(); try { log.debug("读取"); sleep(1); return data; } finally { log.debug("释放读锁..."); r.unlock(); } } public void write() { log.debug("获取写锁..."); w.lock(); try { log.debug("写入"); sleep(1); } finally { log.debug("释放写锁..."); w.unlock(); } } }
|
1 2 3 4 5 6 7 8 9
| public static void main(String[] args) { DataContainer dataContainer = new DataContainer(); new Thread(() -> { dataContainer.read(); }, "t1").start(); new Thread(() -> { dataContainer.read(); }, "t2").start(); }
|
运行结果
1 2 3 4 5 6
| 12:08:41.198 [t1] DEBUG c.DataContainer - 获取读锁... 12:08:41.198 [t2] DEBUG c.DataContainer - 获取读锁... 12:08:41.200 [t1] DEBUG c.DataContainer - 读取 12:08:41.201 [t2] DEBUG c.DataContainer - 读取 12:08:42.203 [t2] DEBUG c.DataContainer - 释放读锁... 12:08:42.203 [t1] DEBUG c.DataContainer - 释放读锁...
|
从输出结果可以看到 t1
线程锁定期间,t2
线程的读操作不受影响
1 2 3 4 5 6 7 8 9 10
| public static void main(String[] args) { DataContainer dataContainer = new DataContainer(); new Thread(() -> { dataContainer.read(); }, "t1").start(); sleep(0.1); new Thread(() -> { dataContainer.write(); }, "t2").start(); }
|
运行结果
1 2 3 4 5 6
| 12:10:50.689 [t1] DEBUG c.DataContainer - 获取读锁... 12:10:50.691 [t1] DEBUG c.DataContainer - 读取 12:10:50.788 [t2] DEBUG c.DataContainer - 获取写锁... 12:10:51.692 [t1] DEBUG c.DataContainer - 释放读锁... 12:10:51.692 [t2] DEBUG c.DataContainer - 写入 12:10:52.694 [t2] DEBUG c.DataContainer - 释放写锁...
|
从结果可以看出,当读锁被释放之后,写锁才能开始执行写入操作,即读锁和写锁产生了互斥。
注意事项
- 读锁不支持条件变量
- 重入时升级不支持:即持有读锁的情况下去获取写锁,会导致获取写锁永久等待
1 2 3 4 5 6 7 8 9 10 11 12
| readLock.lock(); try { writeLock.lock(); try { } finally{ writeLock.unlock(); } } finally{ readLock.unlock(); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); void processCachedData() { rwl.readLock().lock(); if (!cacheValid) { rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid) { data = ... cacheValid = true; } rwl.readLock().lock(); } finally {
rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }
|
锁降级是指把持住写锁,再获取读锁,随后释放写锁的过程。
经过锁降级之后,写锁就会被降级为读锁。
之所以在释放写锁之前需要先获取读锁,是为了避免直接释放写锁后,其他线程获取到写锁来修改当前线程的数据,造成当前线程获取到读锁后,读取错误数据。
五、基本特性总结
读写互斥原则
- 读读相容
- 读写互斥
- 写写互斥
注意事项
1、读锁不支持条件变量
2、重入时升级不支持,即当前线程如果持有读锁,则在重入获得锁时不能在获得写锁
3、重入支持降级,即当前线程如果持有写锁,则在重入时可以获得读锁
缓存之应用
一、缓存更新策略
更新时,是先清缓存还是先更新数据库
先清缓存,再更新数据库
结果:造成查询的值和数据库中的值不一致
先更新数据库,再清除缓存
结果:造成A线程首次查询和后续查询得到不一致的结果,首次查询得到 x=1,后续查询发现已经清空了缓存,需要去数据库中查得 x=2
补充一种情况,假设查询线程 A 查询数据时恰好缓存数据由于时间到期失效,或是第一次查询
这种情况的出现几率非常小
总结
- 先清缓存:可能造成刚清理缓存还没有更新数据库,高并发下,其他线程直接查询了数据库过期数据到缓存中,这种情况非常严重,直接导致后续所有的请求缓存和数据库不一致。
- 先更新据库:可能造成刚更新数据库,还没清空缓存就有线程从缓存拿到了旧数据,这种情况概率比较小,影响范围有限,只对这一次的查询结果有问题。
显而易见,通常情况下,先更新数据库,然后清空缓存。
二、读写锁实现一致性缓存
使用读写锁实现一个简单的按需加载缓存
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79
| class GenericCachedDao<T> { HashMap<SqlPair, T> map = new HashMap<>(); ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); GenericDao genericDao = new GenericDao();
public int update(String sql, Object... params) { SqlPair key = new SqlPair(sql, params); lock.writeLock().lock(); try { int rows = genericDao.update(sql, params); map.clear(); return rows; } finally { lock.writeLock().unlock(); } }
public T queryOne(Class<T> beanClass, String sql, Object... params) { SqlPair key = new SqlPair(sql, params); lock.readLock().lock(); try { T value = map.get(key); if (value != null) { return value; } } finally { lock.readLock().unlock(); } lock.writeLock().lock(); try { T value = map.get(key); if (value == null) { value = genericDao.queryOne(beanClass, sql, params); map.put(key, value); } return value; } finally { lock.writeLock().unlock(); } }
class SqlPair { private String sql; private Object[] params;
public SqlPair(String sql, Object[] params) { this.sql = sql; this.params = params; }
@Override public boolean equals(Object o) { if (this == o) { return true; } if (o == null || getClass() != o.getClass()) { return false; } SqlPair sqlPair = (SqlPair) o; return sql.equals(sqlPair.sql) && Arrays.equals(params, sqlPair.params); }
@Override public int hashCode() { int result = Objects.hash(sql); result = 31 * result + Arrays.hashCode(params); return result; } } }
|
注意
- 以上实现体现的是读写锁的应用,保证缓存和数据库的一致性,但有下面的问题没有考虑
- 适合读多写少,如果写操作比较频繁,以上实现性能低
- 没有考虑缓存容量
- 没有考虑缓存过期
- 只适合单机
- 并发性还是低,目前只会用一把锁
- 更新方法太过简单粗暴,清空了所有 key(考虑按类型分区或重新设计 key)
- 乐观锁实现:用 CAS 去更新
原理
一、ReentrantReadWriteLock简单流程
1.1 独占获取锁简单流程
获取写锁流程
- 独占锁获取(writeLock写锁),首先判断是否有线程获取了锁,是否有线程获取了锁的判断通过读写锁中通过32位int类型state可以获取,其中低16位表示读锁,高16表示写锁。
- 有读锁:直接排队阻塞。
- 有写锁:还需要判断写锁线程是否是自己,如果是自己就是锁重入了,如果不是自己说明已经有其他的线程获取锁正在执行,那么当前线程需要排队阻塞。
- 无锁:直接获取锁,其他抢占的独占锁线程需要排队阻塞,当前线程执行完毕后释放锁通知下一个排队线程获取锁。
1.2 共享获取锁简单流程
读锁获取锁流程
- 共享锁获取(readLock读锁),首先判断是否有线程获取了锁。
- 有读锁:当前线程发现此时读锁状态被占用,说明有线程获取了读锁。该线程通过cas自旋【死循环】获取到读锁为止。
- 有写锁:还需要判断持有写锁的线程是否是自己,如果是自己而且此时获取的是读锁会获取锁成功,我们称为锁降级,如果不是自己说明此时有其他线程获取了写锁,那么当前线程需要排队阻塞。
- 无锁:直接获取锁。