您当前的位置: 首页 > 

Dongguo丶

暂无认证

  • 1浏览

    0关注

    472博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

ReentrantReadWriteLock源码解析

Dongguo丶 发布时间:2021-09-27 18:28:57 ,浏览量:1

文章目录
  • 源码分析
    • t1 w.lock,t2 r.lock
      • t1执行 w.lock成功上锁,
        • tryAcquire(arg)
          • writerShouldBlock()
      • t2 执行 r.lock,
        • tryAcquireShared(arg)
        • doAcquireShared(arg)
          • addWaiter()
          • tryAcquireShared()
    • t3 r.lock,t4 w.lock
      • t3 r.lock
      • t4 w.lock
        • tryAcquire(arg)
        • addWaiter(Node.EXCLUSIVE), arg)
    • t1 w.unlock
      • tryRelease(arg)
      • unparkSuccessor(h)
      • t2被唤醒
      • tryAcquireShared(arg)
        • readerShouldBlock()
      • setHeadAndPropagate(node, r)
        • setHead(node)
        • doReleaseShared()
        • t3被唤醒
    • t2 r.unlock,t3 r.unlock
      • t2 r.unlock
        • tryReleaseShared(arg)
      • t3 r.unlock
        • tryReleaseShared(arg)
        • doReleaseShared()
        • t4被唤醒
          • tryAcquire(arg)
          • setHead(node)
          • acquireQueued返回到acquire

源码分析

读写锁用的是同一个 Sycn 同步器(继承AQS同步器),因此等待队列、state 等也是同一个

假设一个场景有4个线程t1、t2、t3、t4依次是t1线程获取写锁,t2线程获取读锁,t3线程获取读锁,t4线程获取写锁

t1 w.lock,t2 r.lock t1执行 w.lock成功上锁,

流程与 ReentrantLock 加锁相比没有特殊之处,不同是写锁状态占了 state 的低 16 位,而读锁使用的是 state 的高 16 位

ReentrantReadWriteLock.java

//WriteLock的方法
public void lock() {
    sync.acquire(1);
}
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
tryAcquire(arg)
protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    /*
     *介绍:1。如果读计数非零或写计数非零且所有者是不同的线程,则失败。
     *2. 如果计数饱和,则失败。(这只会在count已经非零的情况下发生。)
     *3否则,如果可重入获取或队列策略允许,则该线程有资格获得锁。如果是,请更新状态并设置所有者。
     */
    Thread current = Thread.currentThread();//t1
    int c = getState();//获得同步状态0
    int w = exclusiveCount(c);//写锁数量
    //读锁或写锁已经被其他线程获得
    if (c != 0) {//t1为第一个线程 c=0
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        setState(c + acquires);
        return true;
    }
    //非公平锁false
    if (writerShouldBlock() ||
        //cas操作 state从0变为1
        !compareAndSetState(c, c + acquires))
        return false;
    //cas成功则当前线程t1持有写锁
    setExclusiveOwnerThread(current);
    return true;
}
writerShouldBlock()

对于非公平锁 NonfairSync返回false

final boolean writerShouldBlock() {
    return false; // writers can always barge
}

对于公平锁FairSync会判断如果在当前线程之前有一个在AQS队列中排队的线程,返回true

如果队列的头节点(哨兵节点后指针指向的线程节点)或队列为空,则返回false

final boolean writerShouldBlock() {
    return hasQueuedPredecessors();
}

hasQueuedPredecessors()

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    //队列的tail尾指针与head头指针指向不同节点
    return h != t &&
        //哨兵节点的next后指针指向的节点为null
        ((s = h.next) == null || 
         //哨兵节点的后继节点不是当前节点
         s.thread != Thread.currentThread());
}

我们仍然以非公平锁为例,非公平锁 NonfairSync的writerShouldBlock()始终返回false

image-20210913200450598

写锁占低16位,读锁占高16位,t1获取写锁成功 state值为0_1

t2 执行 r.lock,

这时进入读锁的 sync.acquireShared(1) 流程,首先会进入 tryAcquireShared 流程。

public void lock() {
    sync.acquireShared(1);
}
public final void acquireShared(int arg) {
    //返回-1
    if (tryAcquireShared(arg)  0);
        pred.next = node;
    } else {
        /*
         * waitStatus must be 0 or PROPAGATE.  Indicate that we
         * need a signal, but don't park yet.  Caller will need to
         * retry to make sure it cannot acquire before parking.
         */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//waitStatus从0变为-1
    }
    return false;
}

再 for (; ; ) 循环一次尝试 tryAcquireShared(1) 如果还不成功,那么在 parkAndCheckInterrupt() 处 park

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {//第二次循环
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);//仍然获取锁失败
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            //哨兵节点的waitStatus=-1返回true
            if (shouldParkAfterFailedAcquire(p, node) &&
                //阻塞t2节点
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);//t2被park阻塞在这里
    return Thread.interrupted();
}

t2获取读锁到被阻塞的过程中一共tryAcquireShared()尝试获取读锁3次

image-20210913200559277

t3 r.lock,t4 w.lock t3 r.lock

和t2 r.lock过程类似,除了t3节点的前驱节点是t2

t4 w.lock
public void lock() {
    sync.acquire(1);
}
public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}
tryAcquire(arg)
protected final boolean tryAcquire(int acquires) {
    /*
     * Walkthrough:
     * 1. If read count nonzero or write count nonzero
     *    and owner is a different thread, fail.
     * 2. If count would saturate, fail. (This can only
     *    happen if count is already nonzero.)
     * 3. Otherwise, this thread is eligible for lock if
     *    it is either a reentrant acquire or
     *    queue policy allows it. If so, update state
     *    and set owner.
     */
    Thread current = Thread.currentThread();//t4
    int c = getState();//0_1
    int w = exclusiveCount(c);//写锁数量
    //state!=0 加了写锁或读锁
    if (c != 0) {
        // (Note: if c != 0 and w == 0 then shared count != 0)
        //写锁=0 表示加了读锁 或者 当前线程不是持有锁的线程
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;//返回false
        //上面条件不满足
        //写锁数量+1 超过了写锁的最大数
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
        // Reentrant acquire
        //status= c+1  重入
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}
addWaiter(Node.EXCLUSIVE), arg)
private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);//创建独占模式的t4节点
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;//tail尾指针指向的是t3节点
    if (pred != null) {
        node.prev = pred;//t4的前指针指向t3节点
        if (compareAndSetTail(pred, node)) {//tail尾指针指向t4节点
            pred.next = node;//t3的后指针指向t4节点
            return node;
        }
    }
    enq(node);//t4节点入队
    return node;
}

这种状态下,假设又有 t3 加读锁和 t4 加写锁,这期间 t1 仍然持有锁,就变成了下面的样子

image-20210913200639176

t1 w.unlock

这时会走到写锁的 sync.release(1) 流程,

public void unlock() {
    sync.release(1);
}
public final boolean release(int arg) {
    if (tryRelease(arg)) {//true
        Node h = head;
        //哨兵节点不为null 并且waitStatus 不为0
        if (h != null && h.waitStatus != 0)//true
            unparkSuccessor(h);
        return true;
    }
    return false;
}
tryRelease(arg)
protected final boolean tryRelease(int releases) {
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();
    int nextc = getState() - releases;//1-1 = 0
    boolean free = exclusiveCount(nextc) == 0;//写锁 =0
    //true
    if (free)
        setExclusiveOwnerThread(null);//设置锁持有者为null
    setState(nextc);//state = 0
    return free;//返回true
}

调用 sync.tryRelease(1) 成功,变成下面的样子

image-20210913200709898

unparkSuccessor(h)
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;//哨兵节点的waitStatus = -1
    if (ws  0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus = 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())//返回到这里 执行下次循环
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

这回再来一次 for (;; ) 执行 tryAcquireShared 成功则让读锁计数加一

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();//前驱节点:哨兵节点
            if (p == head) {//true
                int r = tryAcquireShared(arg);//再次尝试获取读锁,如果读锁获取成功 
                if (r >= 0) {
                    setHeadAndPropagate(node, r);//
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
tryAcquireShared(arg)
protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();//t2
    int c = getState();//0
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);//读锁的数量
    //判断读锁应不应该被阻塞  这是一种避免写锁饥饿的机制
    if (!readerShouldBlock() &&
        //读锁的数量小于读写最大数
        r  0
    if (propagate > 0 || h == null || h.waitStatus = 0) {
                    setHeadAndPropagate(node, r);//将
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
protected final int tryAcquireShared(int unused) {
    /*
     * Walkthrough:
     * 1. If write lock held by another thread, fail.
     * 2. Otherwise, this thread is eligible for
     *    lock wrt state, so ask if it should block
     *    because of queue policy. If not, try
     *    to grant by CASing state and updating count.
     *    Note that step does not check for reentrant
     *    acquires, which is postponed to full version
     *    to avoid having to check hold count in
     *    the more typical non-reentrant case.
     * 3. If step 2 fails either because thread
     *    apparently not eligible or CAS fails or count
     *    saturated, chain to version with full retry loop.
     */
    Thread current = Thread.currentThread();//t3
    int c = getState();//1(1_0)
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);//1
    if (!readerShouldBlock() &&
        //读锁数量小于最大数
        r  0 || h == null || h.waitStatus             
关注
打赏
1638062488
查看更多评论
0.0443s