为什么须要读写锁?
与传统锁不同的是读写锁的规则是可以共享读,但只能一个写,总结起来为:读读不互斥,读写互斥,写写互斥,而一样平常的独占锁是:读读互斥,读写互斥,写写互斥,而场景中每每读远远大于写,读写锁便是为了这种优化而创建出来的一种机制。
把稳是读远远大于写,一样平常情形下独占锁的效率低来源于高并发下对临界区的激烈竞争导致线程高下文切换。因此当并发不是很高的情形下,读写锁由于须要额外掩护读锁的状态,可能还不如独占锁的效率高。因此须要根据实际情形选择利用。
一个大略的读写锁实现
根据上面理论可以利用两个int变量来大略实现一个读写锁,实现虽然烂,但是事理都是差不多的,值得阅读下。
public class ReadWriteLock {
/
读锁持有个数
/
private int readCount = 0;
/
写锁持有个数
/
private int writeCount = 0;
/
获取读锁,读锁在写锁不存在的时候才能获取
/
public synchronized void lockRead() throws InterruptedException {
// 写锁存在,须要wait
while (writeCount > 0) {
wait();
}
readCount++;
}
/
开释读锁
/
public synchronized void unlockRead() {
readCount--;
notifyAll();
}
/
获取写锁,当读锁存在时须要wait.
/
public synchronized void lockWrite() throws InterruptedException {
// 先判断是否有写要求
while (writeCount > 0) {
wait();
}
// 此时已经不存在获取写锁的线程了,因此占坑,防止写锁饥饿
writeCount++;
// 读锁为0时获取写锁
while (readCount > 0) {
wait();
}
}
/
开释读锁
/
public synchronized void unlockWrite() {
writeCount--;
notifyAll();
}
}
ReadWriteLock的实现事理
在Java中ReadWriteLock的紧张实现为ReentrantReadWriteLock,其供应了以下特性:
公正性选择:支持公正与非公正(默认)的锁获取办法,吞吐量非公正优先于公正。可重入:读线程获取读锁之后可以再次获取读锁,写线程获取写锁之后可以再次获取写锁可降级:写线程获取写锁之后,其还可以再次获取读锁,然后开释掉写锁,那么此时该线程是读锁状态,也便是降级操作。ReentrantReadWriteLock的构造
ReentrantReadWriteLock的核心是由一个基于AQS的同步器Sync构成,然后由其扩展出ReadLock(共享锁),WriteLock(排它锁)所组成。
并且从ReentrantReadWriteLock的布局函数中可以创造ReadLock与WriteLock利用的是同一个Sync,详细怎么实现同一个行列步队既可以为共享锁,又可以表示排他锁下文会详细剖析。
清单一:ReentrantReadWriteLock布局函数
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
Sync的实现
sync是读写锁实现的核心,sync是基于AQS实现的,在AQS中核心是state字段和双端行列步队,那么一个一个问题来剖析。
Sync如何同时表示读锁与写锁?
清单2:读写锁状态获取
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
/ Returns the number of shared holds represented in count /
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/ Returns the number of exclusive holds represented in count /
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
从代码中获取读写状态可以看出其是把state(int32位)字段分成高16位与低16位,个中高16位表示读锁个数,低16位表示写锁个数,如下图所示(图来自Java并发编程艺术)。
该图表示当前一个线程获取到了写锁,并且重入了两次,因此低16位是3,并且该线程又获取了读锁,并且重入了一次,以是高16位是2,当写锁被获取时如果读锁不为0那么读锁一定是获取写锁的这个线程。
读锁的获取
读锁的获取紧张实现是AQS中的acquireShared方法,其调用过程如下代码。
清单3:读锁获取入口
// ReadLock
public void lock() {
sync.acquireShared(1);
}
// AQS
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
个中doAcquireShared(arg)方法是获取失落败之后AQS中入队操作,等待被唤醒后重新获取,那么关键点便是tryAcquireShared(arg)方法,方法有点长,因此先总结出获取读锁所经历的步骤,获取的第一部分步骤如下:
操作1:读写须要互斥,因此当存在写锁并且持有写锁的线程不是该线程时获取失落败。操作2:是否存在等待写锁的线程,存在的话则获取读锁须要等待,避免写锁饥饿。(写锁优先级是比较高的)操作3:CAS获取读锁,实际上是state字段的高16位自增。操作4:获取成功后再ThreadLocal中记录当前哨程获取读锁的次数。清单4:读锁获取的第一部分
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 操作1:存在写锁,并且写锁不是当前哨程则直接去排队
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
// 操作2:读锁是否该壅塞,对付非公正模式下写锁获取优先级会高,如果存在要获取写锁的线程则读锁须要让步,公正模式下则先来先到
if (!readerShouldBlock() &&
// 读锁利用高16位,因此存在获取上限为2^16-1
r < MAX_COUNT &&
// 操作3:CAS修正读锁状态,实际上是读锁状态+1
compareAndSetState(c, c + SHARED_UNIT)) {
// 操作4:实行到这里解释读锁已经获取成功,因此须要记录线程状态。
if (r == 0) {
firstReader = current; // firstReader是把读锁状态从0变成1的那个线程
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
// 这些代码实际上是从ThreadLocal中获取当前哨程重入读锁的次数,然后自增下。
HoldCounter rh = cachedHoldCounter; // cachedHoldCounter是上一个获取锁成功的线程
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
// 当操作2,操作3失落败时实行该逻辑
return fullTryAcquireShared(current);
}
当操作2,操作3失落败时会实行fullTryAcquireShared(current),为什么会这样写呢?个人认为是一种补偿操作,操作2与操作3失落败并不代表当前哨程没有读锁的资格,并且这里的读锁是共享锁,有资格就该当被获取成功,因此给予补偿获取读锁的操作。在fullTryAcquireShared(current)中是一个循环获取读锁的过程,大致步骤如下:
操作5:等同于操作2,存在写锁,且写锁线程并非当前哨程则直接返回失落败操作6:当前哨程是重入读锁,这里只会倾向第一个获取读锁的线程以及末了一个获取读锁的线程,其他都须要去AQS中排队。操作7:CAS改变读锁状态操作8:同操作4,获取成功后再ThreadLocal中记录当前哨程获取读锁的次数。清单5:读锁获取的第二部分
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
// 最外层嵌套循环
for (;;) {
int c = getState();
// 操作5:存在写锁,且写锁并非当前哨程则直接返回失落败
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
// 操作6:如果当前哨程是重入读锁则放行
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
// 当前是firstReader,则直接放行,解释是已获取的线程重入读锁
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
// 实行到这里解释是其他线程,如果是cachedHoldCounter(其count不为0)也便是上一个获取锁的线程则可以重入,否则进入AQS中排队
// 这里也是对写锁的让步,如果行列步队中头结点为写锁,那么当前获取读锁的线程要进入行列步队中排队
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current)) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
// 解释是上述刚初始化的rh,以是直接去AQS中排队
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error(\"大众Maximum lock count exceeded\"大众);
// 操作7:修正读锁状态,实际上读锁自增操作
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 操作8:对ThreadLocal中掩护的获取锁次数进行更新。
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
读锁的开释
清单6:读锁开释入口
// ReadLock
public void unlock() {
sync.releaseShared(1);
}
// Sync
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared(); // 这里实际上是开释读锁后唤醒写锁的线程操作
return true;
}
return false;
}
读锁的开释紧张是tryReleaseShared(arg)函数,因此拆解其步骤如下:
操作1:清理ThreadLocal中保存的获取锁数量信息操作2:CAS修正读锁个数,实际上是自减一清单7:读锁的开释流程
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
// 操作1:清理ThreadLocal对应的信息
if (firstReader == current) {;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
int count = rh.count;
// 已开释完的读锁的线程清空操作
if (count <= 1) {
readHolds.remove();
// 如果没有获取锁却开释则会报该缺点
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
// 操作2:循环中利用CAS修正读锁状态
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
写锁的获取
清单8:写锁的获取入口
// WriteLock
public void lock() {
sync.acquire(1);
}
// AQS
public final void acquire(int arg) {
// 考试测验获取,获取失落败后入队,入队失落败则interrupt当前哨程
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
写锁的获取也紧张是tryAcquire(arg)方法,这里也拆解步骤:
操作1:如果读锁数量不为0或者写锁数量不为0,并且不是重入操作,则获取失落败。操作2:如果当前锁的数量为0,也便是不存在操作1的情形,那么该线程是有资格获取到写锁,因此修正状态,设置独占线程为当前哨程清单9:写锁的获取
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
// 操作1:c != 0,解释存在读锁或者写锁
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// 写锁为0,读锁不为0 或者获取写锁的线程并不是当前哨程,直接失落败
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error(\"大众Maximum lock count exceeded\"大众);
// Reentrant acquire
// 实行到这里解释是写锁线程的重入操作,直接修正状态,也不须要CAS由于没有竞争
setState(c + acquires);
return true;
}
// 操作2:获取写锁,writerShouldBlock对付非公正模式直接返回fasle,对付公正模式则线程须要排队,因此须要壅塞。
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
写锁的开释
清单10:写锁的开释入口
// WriteLock
public void unlock() {
sync.release(1);
}
// AQS
public final boolean release(int arg) {
// 开释锁成功后唤醒行列步队中第一个线程
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
写锁的开释紧张是tryRelease(arg)方法,其逻辑就比较大略了,注释很详细。
清单11:写锁的开释
protected final boolean tryRelease(int releases) {
// 如果当前哨程没有获取写锁却开释,则直接抛非常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 状态变更至nextc
int nextc = getState() - releases;
// 由于写锁是可以重入,以是在都开释完毕后要把独占标识清空
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
// 修正状态
setState(nextc);
return free;
}
一些其他问题
锁降级操作哪里表示?
锁降级操作指的是一个线程获取写锁之后再获取读锁,然后读锁开释掉写锁的过程。在tryAcquireShared(arg)获取读锁的代码中有如下代码。
清单12:写锁降级策略
Thread current = Thread.currentThread();
// 当前状态
int c = getState();
// 存在写锁,并且写锁不即是当前哨程时返回,换句话说等写锁为当前哨程时则可以连续往下获取读锁。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
。。。。。读锁获取。。。。。
那么锁降级有什么用?答案是为了可见性的担保。在ReentrantReadWriteLock的javadoc中有如下代码,其是锁降级的一个运用示例。
class CachedData {
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
void processCachedData() {
// 获取读锁
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock,不开释的话下面写锁会获取不堪利,造成去世锁
rwl.readLock().unlock();
// 获取写锁
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
// 这里再次获取读锁,如果不获取那么当写锁开释后可能其他写线程再次得到写锁,导致下方`use(data)`时涌现不一致的征象
// 这个操作便是降级
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}
try {
// 利用完后开释读锁
use(data);
} finally {
rwl.readLock().unlock();
}
}
}}
公正与非公正的差异
清单13:公正下的Sync
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors(); // 行列步队中是否有元素,有责当前操作须要block
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();// 行列步队中是否有元素,有责当前操作须要block
}
}
公正下的Sync实现策略是所有获取的读锁或者写锁的线程都须要入行列步队队,按照顺序依次去考试测验获取锁。
清单14:非公正下的Sync
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
// 非公正下不考虑排队,因此写锁可以竞争获取
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/ As a heuristic to avoid indefinite writer starvation,
block if the thread that momentarily appears to be head
of queue, if one exists, is a waiting writer. This is
only a probabilistic effect since a new reader will not
block if there is a waiting writer behind other enabled
readers that have not yet drained from the queue.
/
// 这里实际上是一个优先级,如果行列步队中头部元素时写锁,那么读锁须要等待,避免写锁饥饿。
return apparentlyFirstQueuedIsExclusive();
}
}
非公正下由于抢占式获取锁,写锁是可能产生饥饿,因此办理办法便是提高写锁的优先级,换句话说获取写锁之前先占坑。
作者:牛李,一个正在努力学习的码农,紧张关注后端领域、代码设计,以及一些有趣的技能。GitHub: https://github.com/mrdear
本文系作者投稿文章。欢迎投稿。
投稿内容哀求
互联网技能干系,包括但不限于开拓措辞、网络、数据库、架构、运维、前端、DevOps(DevXXX)、AI、区块链、存储、移动、安全、技能团队管理等内容。文章不须要首发,可以是已经在开源中国博客或网上其它平台发布过的。但是鼓励首发,首发内容被收录可能性较大。如果你是记录某一次办理了某一个问题(这在博客中占绝大比例),那么须要将问题的前因后果描述清楚,最直接的便是结合图文等办法将问题复现,同时完全地解释办理思路与终极成功的方案。如果你是剖析某一技能理论知识,请从定义、运用处景、实际案例、关键技能细节、不雅观点等方面,对其进行较为全面地先容。如果你因此实际案例分享自己或者公司对诸如某一架构模型、通用技能、编程措辞、运维工具的实践,那么请将事宜干系背景、详细技能细节、演进过程、思考、运用效果等方面描述清楚。其它未尽 case 详细情形详细剖析,不虚的,文章投过来试试先,比如我们并不谢绝就某个热点事宜对其进行的报导、深入解析。投稿办法
以 Word 或者 Markdown 文档的形式将稿件投递到 oscbianji@oschina.cn 邮箱主要解释
作者须要拥有所投文章的所有权,不能将别人的文章拿过来投递。投递的文章须要经由审核,如果开源中国编辑以为须要的话,将与作者一起进一步完善文章,意在使文章更佳、传播更广。文章版权归作者所有,开源中国得到文章的传播权,可在开源中国各个平台进行文章传播,同时保留文章原始出处和作者信息,可在官方博客中标原创标签。关注开源中国OSC头条号,获取更多优质内容推送。点击“理解更多”阅读原文。