您当前的位置: 首页 > 

Dongguo丶

暂无认证

  • 2浏览

    0关注

    472博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

从ReentrantLock源码解读AQS

Dongguo丶 发布时间:2021-09-26 15:56:43 ,浏览量:2

文章目录
  • ReentrantLock的原理
    • 公平锁实现原理(了解)
      • 加锁源码
      • 解锁源码
    • 非公平锁实现原理
      • 加锁源码
      • 解锁源码
    • 用模拟银行办理业务来源码解读非公平锁
      • 当A执行lock()
      • B执行lock()
        • tryAcquire(arg)
        • addWaiter(Node.EXCLUSIVE)
        • acquireQueued(addWaiter(node, arg)
          • tryAcquire(arg)
          • shouldParkAfterFailedAcquire(p, node)
      • C执行lock()
        • tryAcquire(arg)
        • addWaiter(Node.EXCLUSIVE)
        • acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
      • A执行unLock()
        • tryRelease(arg)
        • unparkSuccessor(h);
        • B节点被唤醒
          • tryAcquire(arg)
          • setHead(node);
          • 哨兵节点被GC回收

ReentrantLock的原理

Lock接口的实现类,基本都是通过【聚合】了一个【队列同步器】的子类完成线程访问控制的

image-20210909111822833

image-20210909111938089

ReentrantLock的静态内部类Sync 继承了AQS

FairSync公平锁和NonfairSync非公平锁又继承了Sync

image-20210909111944764

image-20210909111949375

ReentrantLock默认是非公平锁,true为公平锁,false为非公平锁

image-20210909111955535

可以明显看出公平锁与非公平锁的lock()方法唯一的区别就在于公平锁在获取同步状态时多了一个限制条件: hasQueuedPredecessors() hasQueuedPredecessors是

对比公平锁和非公平锁的 tryAcquire()方法的实现代码,其实差别就在于非公平锁获取锁时比公平锁中少了一个判断 !hasQueuedPredecessors()

hasQueuedPredecessors() 公平锁加锁时判断等待队列中是否存在有效前置节点的方法即判断了是否需要排队,

导致公平锁和非公平锁的差异如下:

公平锁:公平锁讲究先来先到,线程在获取锁时,如果这个锁的等待队列中已经有线程在等待,那么当前线程就会进入等待队列中;

非公平锁:不管是否有等待队列,如果可以获取锁,则立刻占有锁对象。也就是说队列的第一个排队线程在unpark(),之后还是需要竞争锁(存在线程竞争的情况下)

image-20210909124016650

Sync的实现类有两个

image-20210909124635730

公平锁实现原理(了解) 加锁源码

ReentrantLock.java

static final class FairSync extends Sync {
    private static final long serialVersionUID = -3000897897090466540L;

    final void lock() {
        acquire(1);
    }
...
}    

lock()方法调用AQS中的acquire(1)

AbstractQueuedSynchronizer.java

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

tryAcquire找到Sync的实现类FairSync公平锁的tryAcquire方法

ReentrantLock.java


    //公平锁
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc  0) {
        /*
         * Predecessor was cancelled. Skip over predecessors and
         * indicate retry.
         */
        // 上一个节点取消, 那么重构删除前面所有取消的节点, 返回到外层循环重试
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
         // 这次还没有阻塞
         // 但下次如果重试不成功, 则需要阻塞,这时需要设置上一个节点状态为 Node.SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}
 // ㈧ 阻塞当前线程
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
解锁源码
 // 解锁实现
public void unlock() {
    sync.release(1);
}
 // AQS 的方法
public final boolean release(int arg) {
    // 尝试释放锁, 进入 ㈠
    if (tryRelease(arg)) {
         // 队列头节点 unpark
        Node h = head;
        // 队列不为 null
        if (h != null &&
             // waitStatus == Node.SIGNAL 才需要 unpark
            h.waitStatus != 0)
             // unpark AQS 中等待的线程, 进入 ㈡
            unparkSuccessor(h);
        return true;
    }
    return false;
}
// ㈠ Sync 的方法,
protected final boolean tryRelease(int releases) {
    // state--
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
     // 支持锁重入, 只有 state 减为 0, 才释放成功
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}
 // ㈡ AQS 的方法
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
     // 如果状态为 Node.SIGNAL 尝试重置状态为 0
     // 不成功也可以
    int ws = node.waitStatus;
    if (ws  0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus  {
            try {
                lock.lock();
                System.out.println("A come in");
                TimeUnit.MINUTES.sleep(20);//A长期占用窗口
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock.unlock();
            }
        }, "A").start();
		//B发现业务窗口被A占用,去候客区等待,进入AQS队列,等待A办理完成,再尝试抢占受理窗口
        new Thread(() -> {
            try {
                lock.lock();
                System.out.println("B come in");
            } finally {
                lock.unlock();
            }
        }, "B").start();
        //C同B 进入AQS队列 前面是B FIFO
        new Thread(() -> {
            try {
                lock.lock();
                System.out.println("C come in");
            } finally {
                lock.unlock();
            }
        }, "C").start();
    }
}

image-20210926120711582

当A执行lock()

ReentrantLock.java

public void lock() {
    sync.lock();
}

调用ReentrantLock的静态内部类Sync的lock方法

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = -5179523762034025860L;

    /**
     * Performs {@link Lock#lock}. The main reason for subclassing
     * is to allow fast path for nonfair version.
     */
    abstract void lock();
    ...
}

找到Sync的实现类NonfairSync非公平锁

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = 7316153563782823691L;

    /**
     * Performs lock.  Try immediate barge, backing up to normal
     * acquire on failure.
     */
    final void lock() {
        if (compareAndSetState(0, 1))
            setExclusiveOwnerThread(Thread.currentThread());
        else
            acquire(1);
    }
}

compareAndSetState(0, 1)

使用CAS判断当前state是否为0,如果为0就将state改成1.

此时A是第一个客户(线程),所以state为默认值0.

compareAndSetState(0, 1) 结果为true,将state设置为1.

setExclusiveOwnerThread(Thread.currentThread()

将线程指针指向当前线程,即设置为独占锁

AbstractQueuedSynchronizer.java


    /**
     * The current owner of exclusive mode synchronization.
     *当前独占模式锁的持有者
     */
private transient Thread exclusiveOwnerThread;

protected final void setExclusiveOwnerThread(Thread thread) {
    exclusiveOwnerThread = thread;
}

image-20210926121934603

此时ThreadA获得独占锁,AQS的state = 1

B执行lock()
/**
 * Performs lock.  Try immediate barge, backing up to normal
 * acquire on failure.
 */
final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

此时state为1,compareAndSetState(0, 1) 失败结果为false

进入acquire(1)


        /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success.  This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

//抽象类的空实现
protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
}
tryAcquire(arg)

是AQS的空实现 ,模板方法 ,找到它的实现类非公平锁NonfairSync

ReentrantLock.java

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}


	final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();//当前线程就是B线程
            int c = getState();//state=1
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
        	//state=1  则判断当前线程和拥有偏向锁的线程是否相等
        	//当前线程是A,持有锁的线程是B,不相等返回false
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc  0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        //跳到这里   将哨兵节点的等待状态从0改为-1
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//哨兵节点,0,-1
    }
    return false;
}

image-20210926133416991

结束本次循环进行下次循环

AbstractQueuedSynchronizer.java
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {//第二次自旋
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {//tryAcquire(arg)失败 false
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //true
            if (shouldParkAfterFailedAcquire(p, node) &&
                //true, B算是真真正正的入队阻塞在这里了
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

...
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//哨兵节点此时等待状态-1
        if (ws == Node.SIGNAL)//true
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;//返回
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

...
    private final boolean parkAndCheckInterrupt() {
    //B线程被阻塞在这里
        LockSupport.park(this);
        return Thread.interrupted();
    }

当B线程进入 acquireQueued 逻辑

  1. acquireQueued 会在一个死循环中不断尝试获得锁,失败后进入 park 阻塞

  2. 如果自己是紧邻着 head(排第二位),那么再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败

  3. 进入 shouldParkAfterFailedAcquire 逻辑,将前驱 node,即 哨兵节点 的 waitStatus 改为 -1,这次返回 false

  4. shouldParkAfterFailedAcquire 执行完毕回到 acquireQueued ,再次 tryAcquire 尝试获取锁,当然这时 state 仍为 1,失败

  5. 当再次进入 shouldParkAfterFailedAcquire 时,这时因为其前驱 node 的 waitStatus 已经是 -1,这次返回 true

  6. 进入 parkAndCheckInterrupt, ThreadA 被park进入阻塞

C执行lock()

ReentrantLock.java

 static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;

        /**
         * Performs lock.  Try immediate barge, backing up to normal
         * acquire on failure.
         */
        final void lock() {
            if (compareAndSetState(0, 1))//依然cas失败
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    }

AbstractQueuedSynchronizer.java


public final void acquire(int arg) {
    //!tryAcquire(arg) true
    if (!tryAcquire(arg) &&//
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

...
    //实现类NonfairSync的tryAcquire
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
tryAcquire(arg)

ReentrantLock.java

final boolean nonfairTryAcquire(int acquires) {//1
    final Thread current = Thread.currentThread();//此时当前线程是C线程
    int c = getState();//1
    if (c == 0) {//false
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {//false
        int nextc = c + acquires;
        if (nextc  0) {//false
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            //修改B的等待状态 0改为-1
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;//返回
    }

下次循环

如果前驱节点的 waitStatus 是 SIGNAL状态,即 shouldParkAfterFailedAcquire 方法会返回 true 程序会继续向下执行 parkAndCheckInterrupt 方法,用于将当前线程挂起

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {//第二次循环
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            //再次判断  true
            if (shouldParkAfterFailedAcquire(p, node) &&
                //C阻塞
                parkAndCheckInterrupt())//true
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

...
        private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;//B等待状态  -1
        if (ws == Node.SIGNAL)//true
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;//返回
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }
...
        private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//C被阻塞在这里
        return Thread.interrupted();
    }

image-20210926141500879

每次都是当前节点修改前节点的waitStatus为-1,队尾的WaitStatus为0

此时B、C均被 LockSupport.park挂起,处于阻塞状态

A执行unLock()
public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {//true
        Node h = head;//头节点指向的是哨兵节点
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}
tryRelease(arg)
protected boolean tryRelease(int arg) {
    throw new UnsupportedOperationException();
}

找到实现类

protected final boolean tryRelease(int releases) {//1
    int c = getState() - releases;//1-1=0
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    if (c == 0) {//true
        free = true;
        setExclusiveOwnerThread(null);//将拥有锁的线程设置为null
    }
    setState(c);//设置同步状态设置为0
    return free;//true
}

image-20210926141612551

ThreadA释放锁,进入 tryRelease 流程,如果成功 设置 exclusiveOwnerThread 为 null state = 0

unparkSuccessor(h);
private void unparkSuccessor(Node node) {//哨兵节点
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;//-1
    if (ws  0) {// -1  false
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus             
关注
打赏
1638062488
查看更多评论
0.0480s