您当前的位置: 首页 > 

顧棟

暂无认证

  • 1浏览

    0关注

    227博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

【JUC系列】同步工具类之Phaser

顧棟 发布时间:2022-05-05 06:00:00 ,浏览量:1

Phaser

文章目录
  • Phaser
    • 主要机制
      • Registration(注册机制)
      • Synchronization(同步机制)
        • Arrival(到达机制)
        • Waiting(等待机制)
      • Termination(终止机制)
      • Tiering(分层结构)
      • Monitoring(监控)
    • 组成
      • 内部类QNode
      • 成员变量
      • 构造函数
      • 核心方法
        • 方法说明
    • 案例
      • 动态修改线程数+重入性(单个Phaser多个阶段)
可重用的同步屏障,功能类似于 CyclicBarrier 和 CountDownLatch,但支持更灵活的使用。

主要机制 Registration(注册机制)

与其他屏障的情况不同,在Phaser上注册同步的线程数量可能会随着时间而变化。 可以随时注册任务(使用registerbulkRegister或建立初始所需线程数量的构造函数),并且可以选择在任何到达时取消注册(使用arriveAndDeregister)。 与大多数基本同步结构一样,注册和注销仅影响内部计数; 他们没有建立任何进一步的内部簿记,因此任务无法查询它们是否已注册。 (但是,您可以通过子类化此类来引入此类簿记。)

动态性-支持屏障所需线程的数量

Synchronization(同步机制)

CyclicBarrier一样,Phaser可能会被反复等待。 方法arriveAndAwaitAdvance具有类似于CyclicBarrier.await的效果。 每一代Phaser都有一个相关的阶段编号(phase)。 阶段编号从零开始,当所有线程到达屏障时前进,在达到Integer.MAX_VALUE后重置0。 阶段编号的使用可以通过任何注册方可以调用的两种方法,在到达阶段和等待其他阶段时独立控制操作:

重入性-一个Phaser可以支持多个阶段同步操作

Arrival(到达机制)

方法arrive和到arriveAndDeregister记录到达。 这些方法不会阻塞,而是返回相关的到达阶段编号; 即,到达应用的Phaser的相位编号。 当给定阶段的最后一方到达时,将执行一个可选操作并且该阶段前进。 这些动作由触发阶段推进的一方执行,并通过覆盖方法onAdvance(int, int) 进行安排,该方法也支持控制终止。 覆盖此方法与向CyclicBarrier提供屏障操作类似,但更灵活。

Waiting(等待机制)

方法awaitAdvance需要一个指示到达阶段编号的参数,并在Phaser前进到(或已经处于)不同阶段时返回。 与使用CyclicBarrier的类似构造不同,即使等待线程被中断,方法awaitAdvance也会继续等待。 可中断和超时版本也可用,但在任务等待中断或超时时遇到的异常不会改变Phaser的状态。 如有必要,您可以在这些异常的处理程序中执行任何相关的恢复,通常是在调用forceTermination之后。 在ForkJoinPool中执行的任务也可以使用Phaser,这将确保在其他人被阻塞等待阶段推进时执行任务有足够的并行性。

Termination(终止机制)

Phaser可以进入终止状态,可以使用方法isTerminated进行检查。 终止后,所有同步方法立即返回,无需等待提前,返回值为负数。 同样,在终止时尝试注册也无效。 当onAdvance的调用返回 true 时触发终止。 如果取消注册导致注册方的数量变为零,则默认实现返回 true。 如下图所示,当Phaser控制具有固定迭代次数的动作时,通常可以方便地覆盖此方法以在当前阶段数达到阈值时导致终止。 方法forceTermination也可用于突然释放等待线程并允许它们终止。

Tiering(分层结构)

Phaser可以分层(即以树型结构构建)以减少争用。 可以改为设置具有大量参与方的Phaser,否则这些Phaser将经历沉重的同步争用成本,以便子Phaser组共享一个共同的父级。 这可能会大大增加吞吐量,即使它会产生更大的每次操作开销。

在分层Phaser树中,自动管理子移相器与其父级的注册和注销。 每当子移相器的注册方数量变为非零时(如在Phaser(Phaser, int)构造函数、registerbulkRegister中建立的那样),子Phaser将向其父移相器注册。 每当注册方的数量由于调用到达和取消注册而变为零时,子Phaser就会从其父Phaser中注销。

层次性-将多个Phaser树形结构组织起来,通过牺牲操作的开销增加吞吐量。

Monitoring(监控)

虽然同步方法只能由注册方调用,但Phaser的当前状态可以由任何调用者监视。 在任何给定时刻,总共有getRegisteredParties方,其中getArrivedParties已到达当前阶段 (getPhase)。 当剩余的 (getUnarrivedParties) 方到达时,阶段前进。 这些方法返回的值可能反映瞬态状态,因此通常对同步控制没有用处。 方法toString以一种便于非正式监控的形式返回这些状态查询的快照。

组成 内部类QNode
	// 代表等待队列的 Treiber 堆栈的等待节点
	static final class QNode implements ForkJoinPool.ManagedBlocker {
        final Phaser phaser;
        final int phase;
        final boolean interruptible;
        final boolean timed;
        boolean wasInterrupted;
        long nanos;
        final long deadline;
        volatile Thread thread; // nulled to cancel wait
        QNode next;

        QNode(Phaser phaser, int phase, boolean interruptible,
              boolean timed, long nanos) {
            this.phaser = phaser;
            this.phase = phase;
            this.interruptible = interruptible;
            this.nanos = nanos;
            this.timed = timed;
            this.deadline = timed ? System.nanoTime() + nanos : 0L;
            thread = Thread.currentThread();
        }

        public boolean isReleasable() {
            if (thread == null)
                return true;
            if (phaser.getPhase() != phase) {
                thread = null;
                return true;
            }
            if (Thread.interrupted())
                wasInterrupted = true;
            if (wasInterrupted && interruptible) {
                thread = null;
                return true;
            }
            if (timed) {
                if (nanos > 0L) {
                    nanos = deadline - System.nanoTime();
                }
                if (nanos  0L)
                LockSupport.parkNanos(this, nanos);
            return isReleasable();
        }
    }
成员变量
// 同步状态位
private volatile long state;
// 此移相器的父级,如果没有则为 null
private final Phaser parent;
// 移相器树的根。 如果不在树中,则等于此值。
private final Phaser root;
// 偶数阶段单向链表
private final AtomicReference evenQ;
// 奇数阶段单向链表
private final AtomicReference oddQ;

state状态位的含义,如下图 在这里插入图片描述 所有状态更新都通过 CAS 执行,除了子相位器的初始注册(即具有非 null 父级的子相位器)。在这种(相对罕见的)情况下,我们在首次向其父级注册时使用内置同步来锁定。子相位器的相位可以滞后于其祖先的相位,直到它被实际访问——参见方法 reconcileState。

Phaser中使用奇偶两个单向链表来实现阻塞队列,降低操作的冲突。

构造函数

使用给定的父级和已注册的未到达方数量创建一个新的移相器。 当给定的父节点不为空并且给定的参与方数量大于零时,此子移相器将向其父节点注册。

    /**
     * @param parent  父移相器
     * @param parties 进入下一阶段所需的参与方数量
     */
	public Phaser(Phaser parent, int parties) {
        // 无符号右移16位--验证parties是否在合理范围内
        if (parties >>> PARTIES_SHIFT != 0)
            throw new IllegalArgumentException("Illegal number of parties");
        // 阶段0
        int phase = 0;
        this.parent = parent;
        if (parent != null) {
            // 获取父阶段的根节点
            final Phaser root = parent.root;
            this.root = root;
            // 偶数head
            this.evenQ = root.evenQ;
            // 奇数head
            this.oddQ = root.oddQ;
            // 成员量不为0,需要注册一个阶段
            if (parties != 0)
                phase = parent.doRegister(1);
        }
        // 不存在实际父阶段器
        else {
            this.root = this;
            this.evenQ = new AtomicReference();
            this.oddQ = new AtomicReference();
        }
        // 设置同步状态值--最终二进制位数含义和分布,如上图所示。
        this.state = (parties == 0) ? (long)EMPTY :
            ((long)phase > PHASE_SHIFT);
            // 此时年代小于0 说明年代取消或损坏 不能再上面注册参与者线程,跳出无限循环
            if (phase  PHASE_SHIFT)) !=
               (int)(s >>> PHASE_SHIFT) &&
               !UNSAFE.compareAndSwapLong
               (this, stateOffset, s,
                s = (((long)phase  PARTIES_SHIFT) == 0) ? EMPTY :
                       ((s & PARTIES_MASK) | p))))))
            s = state;
    }
    return s;
}

internalAwaitAdvance(int phase, QNode node)

阻塞等待phase到下一代

    private int internalAwaitAdvance(int phase, QNode node) {
        // assert root == this;
        // 释放上一代
        releaseWaiters(phase-1);          // ensure old queue clean
        boolean queued = false;           // true when node is enqueued
        int lastUnarrived = 0;            // to increase spins upon change
        int spins = SPINS_PER_ARRIVAL;
        long s;
        int p;
        while ((p = (int)((s = state) >>> PHASE_SHIFT)) == phase) {
            if (node == null) {           // spinning in noninterruptible mode
                int unarrived = (int)s & UNARRIVED_MASK;
                if (unarrived != lastUnarrived &&
                    (lastUnarrived = unarrived) > PHASE_SHIFT) == phase) // avoid stale enq
                    queued = head.compareAndSet(q, node);
            }
            else {
                try {
                    ForkJoinPool.managedBlock(node);
                } catch (InterruptedException ie) {
                    node.wasInterrupted = true;
                }
            }
        }

        if (node != null) {
            if (node.thread != null)
                node.thread = null;       // avoid need for unpark()
            if (node.wasInterrupted && !node.interruptible)
                Thread.currentThread().interrupt();
            if (p == phase && (p = (int)(state >>> PHASE_SHIFT)) == phase)
                return abortWait(phase); // possibly clean up on abort
        }
        releaseWaiters(phase);
        return p;
    }

releaseWaiters(int phase)

    private void releaseWaiters(int phase) {
        QNode q;   // first element of queue
        Thread t;  // its thread
        // 0是偶数,通过与1进行位与运算判断用那个链表
        AtomicReference head = (phase & 1) == 0 ? evenQ : oddQ;
        // 遍历栈 开始唤醒链表里面的线程,
        while ((q = head.get()) != null &&
               q.phase != (int)(root.state >>> PHASE_SHIFT)) {
            if (head.compareAndSet(q, q.next) &&
                (t = q.thread) != null) {
                q.thread = null;
                LockSupport.unpark(t);
            }
        }
    }

arrive()arriveAndDeregister() 都是表示到达了变相器,都不需要等他其他线程到达后,才能继续执行。主要由doArrive(int adjust)实现。

doArrive(int adjust)

	// 
	private int doArrive(int adjust) {
        // 获取根结点
        final Phaser root = this.root;
        // 无限循环
        for (;;) {
            // 如果没有父结点,那么状态不需要动,否则需要重新协调状态
            long s = (root == this) ? state : reconcileState();
            // 获取阶段数
            int phase = (int)(s >>> PHASE_SHIFT);
            if (phase > PARTIES_SHIFT;
                    // 如果此线程是根结点
                    if (root == this) {
                        // 检查是否终止此phaser
                        if (onAdvance(phase, nextUnarrived))
                            n |= TERMINATION_BIT;
                        // 初始化状态
                        else if (nextUnarrived == 0)
                            n |= EMPTY;
                        else
                            n |= nextUnarrived;
                        // 阶段+1
                        int nextPhase = (phase + 1) & MAX_PHASE;
                        n |= (long)nextPhase > PHASE_SHIFT);
            if (phase >> PARTIES_SHIFT;
                if (onAdvance(phase, nextUnarrived))
                    n |= TERMINATION_BIT;
                else if (nextUnarrived == 0)
                    n |= EMPTY;
                else
                    n |= nextUnarrived;
                int nextPhase = (phase + 1) & MAX_PHASE;
                n |= (long)nextPhase > PHASE_SHIFT); // terminated
                releaseWaiters(phase);
                return nextPhase;
            }
        }
    }

int awaitAdvance(int phase)

    public int awaitAdvance(int phase) {
        final Phaser root = this.root;、
        // 
        long s = (root == this) ? state : reconcileState();
        // 获取阶段数
        int p = (int)(s >>> PHASE_SHIFT);
        // 
        if (phase = 0) {
        // 将根结点的终止标识置为已终止状态
        if (UNSAFE.compareAndSwapLong(root, stateOffset,
                                      s, s | TERMINATION_BIT)) {
            // signal all threads  唤醒所有线程
            releaseWaiters(0); // Waiters on evenQ 偶数链表
            releaseWaiters(1); // Waiters on oddQ  奇数链表
            return;
        }
    }
}
案例 动态修改线程数+重入性(单个Phaser多个阶段)
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class PhaserDemo {

    private static Random random = new Random(System.currentTimeMillis());

    public static void main(String[] args) {
        Phaser phaser = new Phaser(5);

        ExecutorService e = Executors.newFixedThreadPool(5);
        // 第一阶段的 4个子线程
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "go to phase=" + phaser.getPhase() + ".");
        for (int i = 0; i  {
                try {
                    System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " start.");
                    TimeUnit.SECONDS.sleep(random.nextInt(5));
                    System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " end.");
                    phaser.arriveAndAwaitAdvance();
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            });
        }
        // 第一个阶段的 主线程,第一阶段所需5个线程都满足了。
        phaser.arriveAndAwaitAdvance();
        // 此时phaser的第一个阶段已经完成。phase加1 开始第二阶段开始
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "go to phase=" + phaser.getPhase() + ".");
        // 此时第二阶段只需要6个线程,通过register增加一个线程
        phaser.register();
        // 第二阶段的 5个子线程
        for (int i = 0; i  {
                try {
                    System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " start.");
                    TimeUnit.SECONDS.sleep(random.nextInt(5));
                    System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + " phase=" + phaser.getPhase() + " end.");
                    phaser.arriveAndAwaitAdvance();
                } catch (InterruptedException ex) {
                    ex.printStackTrace();
                }
            });
        }
        // 第二阶段的 主线程,第二阶段所需6个线程都满足了。
        phaser.arriveAndAwaitAdvance();
        // 第三阶段的 主线程,第二阶段所需6个线程都满足了。
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "now phase=" + phaser.getPhase() + ", exit.");
        // 强制终止phaser
        phaser.forceTermination();
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "-" + Thread.currentThread().getName() + "] " + "Phaser Termination Status is " + phaser.isTerminated() + ", exit.");

        e.shutdown();
    }
}

执行结果

[17:58:26-main] go to phase=0.
[17:58:26-pool-1-thread-1]  phase=0 start.
[17:58:26-pool-1-thread-2]  phase=0 start.
[17:58:26-pool-1-thread-4]  phase=0 start.
[17:58:26-pool-1-thread-3]  phase=0 start.
[17:58:26-pool-1-thread-4]  phase=0 end.
[17:58:26-pool-1-thread-3]  phase=0 end.
[17:58:28-pool-1-thread-1]  phase=0 end.
[17:58:30-pool-1-thread-2]  phase=0 end.
[17:58:30-main] go to phase=1.
[17:58:30-pool-1-thread-4]  phase=1 start.
[17:58:30-pool-1-thread-3]  phase=1 start.
[17:58:30-pool-1-thread-1]  phase=1 start.
[17:58:30-pool-1-thread-2]  phase=1 start.
[17:58:30-pool-1-thread-2]  phase=1 end.
[17:58:30-pool-1-thread-5]  phase=1 start.
[17:58:31-pool-1-thread-4]  phase=1 end.
[17:58:32-pool-1-thread-5]  phase=1 end.
[17:58:33-pool-1-thread-1]  phase=1 end.
[17:58:34-pool-1-thread-3]  phase=1 end.
[17:58:34-main] now phase=2, exit.
[17:58:34-main] Phaser Termination Status is true, exit.

Phaser主要用来解决什么问题?

Phaser与CyclicBarrier和CountDownLatch的区别是什么?

如果用CountDownLatch来实现Phaser的功能应该怎么实现?

Phaser运行机制是什么样的?

关注
打赏
1663402667
查看更多评论
立即登录/注册

微信扫码登录

0.0420s