您当前的位置: 首页 >  ar

庄小焱

暂无认证

  • 1浏览

    0关注

    805博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

JDK源码——CountDownLatch/CyclicBarrier/Semaphore类

庄小焱 发布时间:2021-11-26 21:48:08 ,浏览量:1

摘要

讲解CountDownLatch,CyclicBarrier和Semaphore这三个并发包里面的辅助类。

CountDownLatch原理

CountDownLatch 是共享锁的一种实现,它默认构造 AQS 的 state 值为 count。当线程使用 countDown() 方法时,其实使用了tryReleaseShared方法以 CAS 的操作来减少 state,直至 state 为 0 。当调用 await() 方法的时候,如果 state 不为 0,那就证明任务还没有执行完毕,await() 方法就会一直阻塞,也就是说 await() 方法之后的语句不会被执行。然后,CountDownLatch 会自旋 CAS 判断 state == 0,如果 state == 0 的话,就会释放所有等待的线程,await() 方法之后的语句得到执行。

 让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒,CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,调用线程会被阻塞。其它线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),当计数器的值变为零时,因调用await方法被阻塞的线程会被唤醒,继续执行。

CountDownLatch类只提供了一个构造器:

public CountDownLatch(int count) {  };  //参数count为计数值

CountDownLatch类中最重要的方法:

//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行

public void await() throws InterruptedException { }; 
 
//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行

public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };

//将count值减1 
public void countDown() { };  

构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。

与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。其他N 个线程必须引用闭锁对象,因为他们需要通知CountDownLatch对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调 用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。

package com.zhuangxiaoyan.jdk.juc.JucLock;

import java.util.concurrent.CountDownLatch;

/**
 * @Classname CountDownLatchDemo
 * @Description TODO
 * @Date 2021/11/26 21:20
 * @Created by xjl
 */
public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(6);

        for (int i = 1; i  {
                System.out.println(Thread.currentThread().getName() + "\t 上完自习室 离开教室");
                countDownLatch.countDown();
            }, String.valueOf(i)).start();
        }
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t……班长最后走人");
    }
}
package com.zhuangxiaoyan.jdk.juc.JucLock;

import java.util.concurrent.CountDownLatch;

/**
 * @Classname CountDownLatchDemo
 * @Description TODO
 * @Date 2021/11/26 21:20
 * @Created by xjl
 */
public class CountDownLatchDemo {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(6);

        for (int i = 1; i  {
                System.out.println(Thread.currentThread().getName() + "\t 国 ,被灭");
                countDownLatch.countDown();
            }, Countryenum.forEach_counttryEnum(i).getRetMessage()).start();
        }
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "\t……秦帝国统一天下");
    }
}



package com.zhuangxiaoyan.jdk.juc.JucLock;

import lombok.Getter;

/**
 * @Classname Countryenum
 * @Description TODO
 * @Date 2021/11/26 21:33
 * @Created by xjl
 */
public enum Countryenum {

    ONE(1, "齐"), TWO(2, "楚"), THREE(3, "燕"), FOUR(4, "赵"), FIVE(5, "魏"), SIX(6, "韩");
    @Getter
    private Integer retcode;
    @Getter
    private String retMessage;

    Countryenum(Integer retcode, String retMessage) {
        this.retcode = retcode;
        this.retMessage = retMessage;
    }

    public static Countryenum forEach_counttryEnum(int index){
        Countryenum[] values = Countryenum.values();
        for (Countryenum element:values){
            if (index==element.getRetcode()){
                return element;
            }
        }
        return null;
    }
}
CyclicBarrier原理

CountDownLatch 的实现是基于AQS的,CycliBarrier 是基于 ReentrantLock(ReentrantLock 也属于 AQS 同步器)和 Condition 的。

通过它可以实现让一组线程等待至某个状态之后再全部同时执行,CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。

package com.zhuangxiaoyan.jdk.juc.JucLock;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

/**
 * @Classname CyclicBarrierDemo
 * @Description TODO
 * @Date 2021/11/26 21:49
 * @Created by xjl
 */
public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier=new CyclicBarrier(7,()->{
            System.out.println("^^^^^^^召唤神龙");
        });
        for (int i=1;i{
                System.out.println(Thread.currentThread().getName()+"\t 收集到了第"+Temp+"龙珠");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}

CyclicBarrier的应用场景

  • CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。比如我们用一个Excel保存了用户所有银行流水,每个Sheet保存一个帐户近一年的每笔银行流水,现在需要统计用户的日均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后,再用barrierAction用这些线程的计算结果,计算出整个Excel的日均银行流水。

CyclicBarrier和CountDownLatch的区别

  • CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。
  • CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。
Semaphore原理

Semaphore 有两种模式,公平模式和非公平模式。

  • 公平模式: 调用 acquire() 方法的顺序就是获取许可证的顺序,遵循 FIFO;
  • 非公平模式: 抢占式的。

我们可以发现Semaphore提供了两种模式的锁机制,一种是公平模式,一种是非公平模式,公平模式其实就是如果发现了有线程在排队等待,则自觉到后面去排队,而非公平模式则不一样,它不管你有没有在排队的线程,谁先抢到是谁的,说到这里我们发现上例子中当声明Semaphore时,其实默认使用了非公平模式NonfairSync,指定了信号量数量为1个,其实它内部Sync中调用了AQSsetState方法,设置同步器状态state为1,详细如下图所示:

 当第一个线程提交任务到线程池时,它会先经过semaphore.acquire()方法来进行获得一个许可操作

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

我们可以看到它调用了sync.acquireSharedInterruptibly(1)方法,这个snyc其实是Semaphore内部类Sync的实例对象,那么问题来了,这个sync变量是什么时候初始化的呢?其实当我们初始化Semaphore,就已经将sync变量初始化了,接下来我们看一下Semaphore构造函数:

//构造函数一:初始化信号量数量为permits个,并采用非公平模式
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
//构造函数二:如果指定fair为true,则采用公平模式,如果指定为false,则采用非公平模式,并且初始化信号量数量为permits个。
public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
方法名描述acquire()尝试获得一个准入许可,如无法获得,则线程等待,直到有线程释放一个许可或当线程被中断。acquire(int permits)尝试获得permits个准入许可,如无法获得,则线程等待,直到有线程释放permits个许可或当线程被中断。acquireUninterruptibly()尝试获得一个准入许可,如无法获得,则线程等待,直到有线程释放一个许可,但是不响应中断请求acquireUninterruptibly(int permits)尝试获得permits个准入许可,如无法获得,则线程等待,直到有线程释放permits个许可,但是不响应中断请求release()用于在线程访问资源结束后,释放一个许可,以使其他等待许可的线程可以进行资源访问。release(int permits)用于在线程访问资源结束后,释放permits个许可,以使其他等待许可的线程可以进行资源访问。tryAcquire()尝试获得一个许可,如果获得许可成功返回true,如果失败则返回fasle,它不会等待,立即返回tryAcquire(int permits)尝试获得permits个许可,如果获得许可成功返回true,如果失败则返回fasle,它不会等待,立即返回tryAcquire(int permits, long timeout, TimeUnit unit)尝试在指定时间内获得permits个许可,如果在指定时间内没有获得许可则则返回false,反之返回truetryAcquire(long timeout, TimeUnit unit)尝试在指定时间内获得一个许可,如果在指定时间内没有获得许可则则返回false,反之返回trueavailablePermits():当前可用的许可数

信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。把它比作是控制流量的红绿灯,比如一条马路要限制流量,只允许同时有一百辆车在这条路上行使,其他的都必须在路口等待,所以前一百辆车会看到绿灯,可以开进这条马路,后面的车会看到红灯,不能驶入马路,但是如果前一百辆中有五辆车已经离开了马路,那么后面就允许有5辆车驶入马路,这个例子里说的车就是线程,驶入马路就表示线程在执行,离开马路就表示线程执行完成,看见红灯就表示线程被阻塞,不能执行。

package com.zhuangxiaoyan.jdk.juc.JucLock;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

/**
 * @Classname SemaphoreDemo
 * @Description TODO
 * @Date 2021/11/26 21:55
 * @Created by xjl
 */
public class SemaphoreDemo {

    public static void main(String[] args) {
        Semaphore semaphore=new Semaphore(3);

        for (int i=1;i{
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName()+"\t抢到车位");
                    try {
                        TimeUnit.SECONDS.sleep(3);
                    }catch (InterruptedException e){
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName()+"\t停车3秒后离开车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    semaphore.release();
                }
            },String.valueOf(i)).start();
        }
    }
}

Semaphore类位于java.util.concurrent包下,它提供了2个构造器:

public Semaphore(int permits) {
//参数permits表示许可数目,即同时可以允许多少线程进行访问
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) { 
//这个多了一个参数fair表示是否是公平的,即等待时间越久的越先获取许可
    sync = (fair)? new FairSync(permits) : new NonfairSync(permits);
}
//获取一个许可 acquire()用来获取一个许可,若无许可能够获得,则会一直等待,直到获得许可。
public void acquire() throws InterruptedException {  } 

//获取permits个许可
public void acquire(int permits) throws InterruptedException { } 

//释放一个许可 release()用来释放许可。注意,在释放许可之前,必须先获获得许可。
public void release() { }          

//释放permits个许可
public void release(int permits) { }    

这4个方法都会被阻塞,如果想立即得到执行结果,可以使用下面几个方法:

//尝试获取一个许可,若获取成功,则立即返回true,若获取失败,则立即返回false

public boolean tryAcquire() { };  

//尝试获取一个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false

public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { };  

//尝试获取permits个许可,若获取成功,则立即返回true,若获取失败,则立即返回false

public boolean tryAcquire(int permits) { };

//尝试获取permits个许可,若在指定的时间内获取成功,则立即返回true,否则则立即返回false

public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { }; 
参考博文

AQS、Semaphore、CountDownLatch与CyclicBarrier原理及使用方法_如何心安理得的在老板眼皮下摸鱼-CSDN博客_aqs java

、图解Semaphore信号量之AQS共享锁-非公平模式_wanghao112956的博客-CSDN博

关注
打赏
1657692713
查看更多评论
立即登录/注册

微信扫码登录

0.0387s