您当前的位置: 首页 >  ar

顧棟

暂无认证

  • 1浏览

    0关注

    227博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

【JUC系列】同步工具类之CyclicBarrier

顧棟 发布时间:2022-05-03 06:00:00 ,浏览量:1

同步屏障 CyclicBarrier

文章目录
  • 同步屏障 CyclicBarrier
    • 示例
    • 核心思想
    • 组成
      • 构造函数
      • 内部类 Generation
      • 成员变量
      • 核心方法
        • dowait(boolean timed, long nanos)
        • nextGeneration()
        • breakBarrier()
"循环屏障"是一种同步辅助工具,它允许一组线程相互等待以达到共同的屏障点。 CyclicBarriers 在涉及固定大小的线程组的程序中很有用,这些线程组必须偶尔相互等待。 屏障被称为循环的,因为它可以在等待线程被释放后重新使用。

CyclicBarrier 支持一个可选的 Runnable 命令,该命令在每个屏障点运行一次,在队伍中的最后一个线程到达之后,但在任何线程被释放之前。 此屏障操作对于在任何一方继续之前更新共享状态很有用。 在这里插入图片描述

示例
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.*;

/**
 * 分布计算再合并
 */
public class CyclicBarrierDemo2 implements Runnable {

    /**
     * 当最后一个线程到达屏障点,执行此方法,汇总各个线程的值
     */
    @Override
    public void run() {
        int result = 0;
        // 遍历
        for (Map.Entry ss : s.entrySet()) {
            result += ss.getValue();
        }
        s.put("result", result);
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] count value [" + result + "].");
    }

    /**
     * 定义一个需要4个线程的同步屏障,并在最后一个到达屏障的线程后,执行当前类的run方法
     */
    private final CyclicBarrier c = new CyclicBarrier(4, this);

    /**
     * 4个线程的线程池
     */
    private final Executor executor = Executors.newFixedThreadPool(4);

    /**
     * 用来存放各个线程运算结果的值
     */
    private final ConcurrentHashMap s = new ConcurrentHashMap();

    /**
     * 计算api 4个线程分别向变量s中塞值
     */
    private void count() {
        for (int i = 0; i  {
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] put value [" + finalI + "] and await.");
                s.put(Thread.currentThread().getName(), finalI);
                try {
                    c.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            });
        }
    }

    public static void main(String[] args) {
        CyclicBarrierDemo2 cyclicBarrierDemo2 = new CyclicBarrierDemo2();
        cyclicBarrierDemo2.count();
    }
}

执行结果

[16:51:15--pool-1-thread-1] put value [0] and await.
[16:51:15--pool-1-thread-3] put value [2] and await.
[16:51:15--pool-1-thread-4] put value [3] and await.
[16:51:15--pool-1-thread-2] put value [1] and await.
[16:51:15--pool-1-thread-2] count value [6].
核心思想

内部组合了非公平策略的重入锁,借助AQS实现线程的阻塞和唤醒,主要依赖条件队列。关于AQS的Condition。

组成 构造函数
    // 创建一个新的 CyclicBarrier,它将在给定数量的参与方(线程)等待它时触发,并且在触发障碍时不执行预定义的操作。 parties代表线程数
	public CyclicBarrier(int parties) {
        this(parties, null);
    }

	// 创建一个新的 CyclicBarrier,当给定数量的参与方(线程)正在等待它时,它将触发,并且当障碍被触发时,它将执行给定的屏障动作,由最后一个进入屏障的线程执行。barrierAction代表当屏障被触发时执行的命令,如果没有动作则为 null
    public CyclicBarrier(int parties, Runnable barrierAction) {
        // 参与者必须大于0
        if (parties  0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    // 若当前线程发生中断,需要损坏当代屏障
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }
                // 再次检查屏障是否损坏
                if (g.broken)
                    throw new BrokenBarrierException();
                // 代数不对 返回下标
                if (g != generation)
                    return index;

                // 设置等待时间,并且已经过了超时时间 损坏屏障 抛出超时异常
                if (timed && nanos             
关注
打赏
1663402667
查看更多评论
0.0420s