- ReentrantLock 定义
- 使用ReentrantLock示例
- 使用ReentrantLock的好处
- ReentrantLock提供了tryLock可设置等待时长
- ReentrantLock可以设置是否公平锁
- 可手动加锁和释放锁
- ReentrantLock可实现多条件的绑定
- ReentrantLock的原理 AQS
- ReentrantLock与AQS的关系
- 一图简单了解AQS
- AQS中的state如何实现加锁
- AQS中的state如何实现可重入加锁的
- 加锁失败时AQS中的队列介绍
- 加锁失败时入队列详解
- 加锁失败时addWaiter方法详解
- 加锁失败时acquireQueued方法详解
- 第三个线程尝试加锁详解
- 基于state释放锁详解
- 释放锁后唤醒队列中其他线程进行加锁详解
ReentrantLock 根据其jdk给的注释定义如下:
A reentrant mutual exclusion Lock
即代表ReentrantLock是可重入的互斥锁.
- 可重入代表已加锁的线程, 可以重复加锁. 可重入的最大次数为int的最大值:2147483647
- 互斥锁即代表只能有一个线程上锁成功, 其他线程想要加锁只能阻塞等待.
根据jdk给的ReentrantLock示例代码如下
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
即官方建议释放锁的操作放在finally 代码块中, 这样可以在一定程度上避免死锁. 在创建ReentrantLock 时, 默认的无参构造是创建一个非公平锁, 可以传递一个布尔值, 来设定是否为公平锁.
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
除了上面给出的lock方法外, 也可以使用tryLock
, 传递一个等待加锁的时间, 返回一个布尔值. 加锁成功 则返回true, 失败返回false , 则代表到了设定的时间还是加锁失败.
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
使用ReentrantLock的好处
ReentrantLock提供了tryLock可设置等待时长
例如上文提到的ReentrantLock提供了可设置等待时长的方法, tryLock(long timeout, TimeUnit unit)
, 如果到时间了还没加锁成功, 则放弃加锁, 这样也能节省服务器资源.
java中的synchronized关键字只能是非公平的, 是一种抢占式的加锁. 而ReentrantLock中可以在构造方法设置是否为公平锁. 使用更加灵活. 其公平锁主要实现思路为在加锁的方法执行之前, 一定会先去判断当前的阻塞队列中是否有线程在排队, 如果有, 则只能是队列中的第一个线程加锁成功.
可手动加锁和释放锁相比较于synchronized 只能jvm去释放锁, ReentrantLock 可以自己通过调用方法去手动的加锁和释放锁, 释放锁一般是写在finally代码块中. 这也存在一定的弊端, 如果没有调用unlock方法 ,则会造成死锁, 因此在使用 ReentrantLock 时, 一定加锁和释放锁配对使用.
ReentrantLock可实现多条件的绑定一个ReentrantLock对象, 可以绑定多个Condition , 通过调用await和signal方法, 来进行线程的精确唤醒. 而synchronized只能要么随机唤醒一个线程, 要么唤醒所有的线程.
ReentrantLock的原理 AQS ReentrantLock与AQS的关系ReentrantLock默认的构造方法如下 :
public ReentrantLock() {
sync = new NonfairSync();
}
可以看到实际上是new了一个NonfairSync
对象. 而NonfairSync
对象的源码如下, 它实际上是继承了Sync 类.
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
Sync 类 的源码截图如下, Sync 本身是ReentrantLock 一个内部抽象类, 提供了很多模板方法. 它实际上是继承了抽象类AbstractQueuedSynchronizer , 而AbstractQueuedSynchronizer其实就是所说的AQS. 因此ReentrantLock 是基于AQS的. AbstractQueuedSynchronizer的核心主要是node, 自定义的一个内部类, 双向链表(实现的一个阻塞等待队列), state用于实现加锁, 释放锁.
AQS的结构如上图所示. 流程如下
- 线程1 加锁的时候, aqs会去判断state 是否为0 , 如果是0 , 代表没有线程加锁, 则线程1可以加锁, state变量进行cas操作给加1, 并且会维护当前加锁的线程为1. 如果线程1 重复加锁, 则state会累加1.
- 线程2加锁的时候, 此时aqs判断state不为0, 则线程2加锁失败, 进入aqs的队列, 进行等待入队.
- 线程1 释放锁, 当释放到state 为0的时候, 会唤醒队列中阻塞等待的线程.
- 线程2被唤醒, 出队列去尝试加锁, 此时state为0 , 加锁成功, 当前加锁的线程变为线程2.
根据ReentrantLock 默认的构造方法的内部类NonfairSync 如下的源码, 加锁的时候, 调用lock方法, NonfairSync类对其进行了重写.
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
先是去执行compareAndSetState
方法, 该方法为AQS的类中的方法. 具体实现如下. 可以看到其是调用jdk底层的Unsafe 类的cas方法, 去进行state变量的操作了. cas是一种无锁化的操作, 提升了加锁的性能.
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
回到NonfairSync
类, 如果成功把state修改为1了, 则代表加锁成功, 则会执行setExclusiveOwnerThread
方法, 即设置当前加锁的线程. 该方法为AbstractOwnableSynchronizer
类中的方法.
当线程1加锁成功后, 再次尝试加锁时, 由于此时state不为0 ,则206行的if判断为false ,则会走209行的代码, 执行acqure方法, 传递的参数为1.
acquire代码如下, 是一个if判断, 先执行tryAcquire方法, 传递进去1.
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire方法 在aqs中是一个空的方法, 需要看子类的实现. 实现的方法在ReentrantLock类中的内部类NonfairSync中, 执行nonfairTryAcquire方法.
该方法的代码如下:
final boolean nonfairTryAcquire(int acquires) {
// 获取当前加锁线程.
final Thread current = Thread.currentThread();
// 获取state的值
int c = getState();
// 此处判断c是否等于0 ,是为了一开始有线程加锁, 但之后又释放锁, 因此此处再次去尝试加锁.
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
// 此处判断加锁的线程是否为当前线程
else if (current == getExclusiveOwnerThread()) {
// 是当前线程则把stata加1 c为state的值, acquires为1
int nextc = c + acquires;
if (nextc 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
compareAndSetWaitStatus
方法内容如下, 是进行cas操作, 把第一个节点的waitStatus 的值设置为 Node.SIGNAL
, 并且返回一个false
private static final boolean compareAndSetWaitStatus(Node node, int expect,int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,expect, update);
}
设置完waitStatus值后, 回到如下的for循环, 由于shouldParkAfterFailedAcquire
方法返回的是false, 此时还没有代码执行return操作, 因此, 还会继续循环下面的代码, 依然是获取第一个节点, tryAcquire方法去加锁依然会失败, 会再次执行 shouldParkAfterFailedAcquire
方法
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
再次进入到shouldParkAfterFailedAcquire
方法后, 由于上一轮的循环中, 已经把pred节点的waitStatus设置成为了SIGNAL
, 那么则会直接返回true.
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
// Node.SIGNAL = -1
if (ws == Node.SIGNAL)
return true;
return false;
}
shouldParkAfterFailedAcquire
方法返回true之后, 则会执行上一步for循环if判断中的parkAndCheckInterrupt
方法 该方法内容如下: 主要的操作就是执行 LockSupport.park
方法, 把第二个尝试加锁的线程, 进行挂起, 等待第一个线程释放锁的时候, 执行LockSupport.unpark
方法去唤醒该线程.
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
第三个线程尝试加锁详解
根据上面的流程, 此时的状况入下图所示, 线程A加锁成功, 线程B被park挂起 假如再有第三个线程来进行尝试加锁, 根据上面的分析, 则会走到AQS的
addWaiter
方法中去. 如下所示.
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Node node = new Node(Thread.currentThread(), mode)
这行代码则是把线程3 封装为一个node对象, 接着获取尾节点, 此时尾节点不为空. 那么则执行node.prev = pred
这一行代码. 即把第三个线程的node的prev指向第二个线程的node对象. 如下图所示 接着执行
compareAndSetTail(pred, node)
cas操作, 把尾节点设置成线程C. 则变成如下的情况: 最后执行
pred.next = node;
来结束addWaiter
方法. 把线程B的next指向第三个线程的node. 如下图所示. 接着去执行AQS的
acquireQueued
方法. 与上面提到的步骤一样, 在for循环中, 设置线程B的waitStatus
的值为SIGNAL
, 接着线程C执行 LockSupport.park(this)
方法被挂起. 此时情形如下图所示.线程B和C都被挂起.
释放锁调用unlock
方法, 该方法如下
public void unlock() {
sync.release(1);
}
release
方法如下
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
首先回调用tryRelease
方法. 该方法在AQS中是一个空的方法
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
需要找到其子类的实现. 在ReentrantLock类的Sync内部类中对该方法进行了实现, 代码如下:
protected final boolean tryRelease(int releases) {
// relases的值为1 , 把state的值减1
int c = getState() - releases;
// 校验当前释放锁的线程是否为加锁的线程, 不是则抛异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 判断state的值是否为0
if (c == 0) {
free = true;
// 释放锁完成, 把当前加锁的线程设置为null
setExclusiveOwnerThread(null);
}
// 设置state最新的值,并且state是被volatile修饰的, 保证可见性
setState(c);
// 如果state为0 则返回true, state不为0 , 返回false
return free;
}
释放锁后唤醒队列中其他线程进行加锁详解
在上一小节中的release
方法中, 假如此时其他线程已经完全释放锁了, 即state为0了, 那么tryRelease
返回的则是true, 则会走下面node相关的代码.
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
Node h = head;
是获取当前队列中的头节点, 并把值给h. 即如下图这一步 接着来了一个if判断
if (h != null && h.waitStatus != 0)
此时h是不等于空的, 并且h.waitStatus
的值是signal 为-1, 是不等于0 的, 因此if判断为true, 那么 则会走 unparkSuccessor(h)
方法, 并且传递的是头节点, 此节点是没有要加锁的阻塞等待的线程的, 它的下一个节点才有.
unparkSuccessor
方法的内容如下, 根据其注释此方法会唤醒其下一个节点, 即线程B
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
// 获取第一个节点的waitStatus
int ws = node.waitStatus;
if (ws 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?