- ReentrantLock的原理
- 公平锁实现原理(了解)
- 加锁源码
- 解锁源码
- 非公平锁实现原理
- 加锁源码
- 解锁源码
- 用模拟银行办理业务来源码解读非公平锁
- 当A执行lock()
- B执行lock()
- tryAcquire(arg)
- addWaiter(Node.EXCLUSIVE)
- acquireQueued(addWaiter(node, arg)
- tryAcquire(arg)
- shouldParkAfterFailedAcquire(p, node)
- C执行lock()
- tryAcquire(arg)
- addWaiter(Node.EXCLUSIVE)
- acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
- A执行unLock()
- tryRelease(arg)
- unparkSuccessor(h);
- B节点被唤醒
- tryAcquire(arg)
- setHead(node);
- 哨兵节点被GC回收
Lock接口的实现类,基本都是通过【聚合】了一个【队列同步器】的子类完成线程访问控制的
ReentrantLock的静态内部类Sync 继承了AQS
FairSync公平锁和NonfairSync非公平锁又继承了Sync
ReentrantLock默认是非公平锁,true为公平锁,false为非公平锁
可以明显看出公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件: hasQueuedPredecessors() hasQueuedPredecessors是
对比公平锁和非公平锁的 tryAcquire()方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁中少了一个判断 !hasQueuedPredecessors()
hasQueuedPredecessors() 公平锁加锁时判断等待队列中是否存在有效前置节点的方法即判断了是否需要排队,
导致公平锁和非公平锁的差异如下:
公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;
非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程在unpark(),之后还是需要竞争锁(存在线程竞争的情况下)
Sync的实现类有两个
ReentrantLock.java
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
...
}
lock()方法调用AQS中的acquire(1)
AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcquire找到Sync的实现类FairSync公平锁的tryAcquire方法
ReentrantLock.java
//公平锁
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
// 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
// 这次还没有阻塞
// 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// ㈧ 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
解锁源码
// 解锁实现
public void unlock() {
sync.release(1);
}
// AQS 的方法
public final boolean release(int arg) {
// 尝试释放锁, 进入 ㈠
if (tryRelease(arg)) {
// 队列头节点 unpark
Node h = head;
// 队列不为 null
if (h != null &&
// waitStatus == Node.SIGNAL 才需要 unpark
h.waitStatus != 0)
// unpark AQS 中等待的线程, 进入 ㈡
unparkSuccessor(h);
return true;
}
return false;
}
// ㈠ Sync 的方法,
protected final boolean tryRelease(int releases) {
// state--
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 支持锁重入, 只有 state 减为 0, 才释放成功
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
// ㈡ AQS 的方法
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
// 如果状态为 Node.SIGNAL 尝试重置状态为 0
// 不成功也可以
int ws = node.waitStatus;
if (ws 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus {
try {
lock.lock();
System.out.println("A come in");
TimeUnit.MINUTES.sleep(20);//A长期占用窗口
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}, "A").start();
//B发现业务窗口被A占用,去候客区等待,进入AQS队列,等待A办理完成,再尝试抢占受理窗口
new Thread(() -> {
try {
lock.lock();
System.out.println("B come in");
} finally {
lock.unlock();
}
}, "B").start();
//C同B 进入AQS队列 前面是B FIFO
new Thread(() -> {
try {
lock.lock();
System.out.println("C come in");
} finally {
lock.unlock();
}
}, "C").start();
}
}
ReentrantLock.java
public void lock() {
sync.lock();
}
调用ReentrantLock的静态内部类Sync的lock方法
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;
/**
* Performs {@link Lock#lock}. The main reason for subclassing
* is to allow fast path for nonfair version.
*/
abstract void lock();
...
}
找到Sync的实现类NonfairSync非公平锁
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
compareAndSetState(0, 1)
使用CAS判断当前state是否为0,如果为0就将state改成1.
此时A是第一个客户(线程),所以state为默认值0.
compareAndSetState(0, 1) 结果为true,将state设置为1.
setExclusiveOwnerThread(Thread.currentThread()
将线程指针指向当前线程,即设置为独占锁
AbstractQueuedSynchronizer.java
/**
* The current owner of exclusive mode synchronization.
*当前独占模式锁的持有者
*/
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
此时ThreadA获得独占锁,AQS的state = 1
B执行lock()/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
此时state为1,compareAndSetState(0, 1) 失败结果为false
进入acquire(1)
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
//抽象类的空实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcquire(arg)
是AQS的空实现 ,模板方法 ,找到它的实现类非公平锁NonfairSync
ReentrantLock.java
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();//当前线程就是B线程
int c = getState();//state=1
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//state=1 则判断当前线程和拥有偏向锁的线程是否相等
//当前线程是A,持有锁的线程是B,不相等返回false
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//跳到这里 将哨兵节点的等待状态从0改为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//哨兵节点,0,-1
}
return false;
}
结束本次循环进行下次循环
AbstractQueuedSynchronizer.java
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//第二次自旋
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//tryAcquire(arg)失败 false
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//true
if (shouldParkAfterFailedAcquire(p, node) &&
//true, B算是真真正正的入队阻塞在这里了
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
...
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//哨兵节点此时等待状态-1
if (ws == Node.SIGNAL)//true
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;//返回
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
...
private final boolean parkAndCheckInterrupt() {
//B线程被阻塞在这里
LockSupport.park(this);
return Thread.interrupted();
}
当B线程进入 acquireQueued 逻辑
-
acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞
-
如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
-
进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 哨兵节点 的 waitStatus 改为 -1,这次返回 false
-
shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败
-
当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true
-
进入 parkAndCheckInterrupt, ThreadA 被park进入阻塞
ReentrantLock.java
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))//依然cas失败
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
}
AbstractQueuedSynchronizer.java
public final void acquire(int arg) {
//!tryAcquire(arg) true
if (!tryAcquire(arg) &&//
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
...
//实现类NonfairSync的tryAcquire
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
tryAcquire(arg)
ReentrantLock.java
final boolean nonfairTryAcquire(int acquires) {//1
final Thread current = Thread.currentThread();//此时当前线程是C线程
int c = getState();//1
if (c == 0) {//false
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//false
int nextc = c + acquires;
if (nextc 0) {//false
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//修改B的等待状态 0改为-1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;//返回
}
下次循环
如果前驱节点的 waitStatus 是 SIGNAL状态,即 shouldParkAfterFailedAcquire 方法会返回 true 程序会继续向下执行 parkAndCheckInterrupt 方法,用于将当前线程挂起
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//第二次循环
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//再次判断 true
if (shouldParkAfterFailedAcquire(p, node) &&
//C阻塞
parkAndCheckInterrupt())//true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
...
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;//B等待状态 -1
if (ws == Node.SIGNAL)//true
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;//返回
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
...
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);//C被阻塞在这里
return Thread.interrupted();
}
每次都是当前节点修改前节点的waitStatus为-1,队尾的WaitStatus为0
此时B、C均被 LockSupport.park挂起,处于阻塞状态
A执行unLock()public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {//true
Node h = head;//头节点指向的是哨兵节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
tryRelease(arg)
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
找到实现类
protected final boolean tryRelease(int releases) {//1
int c = getState() - releases;//1-1=0
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {//true
free = true;
setExclusiveOwnerThread(null);//将拥有锁的线程设置为null
}
setState(c);//设置同步状态设置为0
return free;//true
}
ThreadA释放锁,进入 tryRelease 流程,如果成功 设置 exclusiveOwnerThread 为 null state = 0
unparkSuccessor(h);private void unparkSuccessor(Node node) {//哨兵节点
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;//-1
if (ws 0) {// -1 false
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?