您当前的位置: 首页 >  Java

止步前行

暂无认证

  • 0浏览

    0关注

    247博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Java并发编程——CountDownLatch和CyclicBarrier

止步前行 发布时间:2019-03-23 12:38:58 ,浏览量:0

一、引言

在Java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier、Semaphore。下面就来学习一下这三个辅助类的用法,若有不正之处请多多谅解。

二、CountDownLatch 1、CountDownLatch 是什么呢?

CountDownLatch 在Java1.5中被引入的,跟它一起被引入的并发工具类还有CyclicBarrier、Semaphore、ConcurrentHashMap和BlockingQueue。它们都存在于java.util.concurrent包下。CountDownLatch 这个类能够使一个线程等待其他线程完成各自的工作后再执行。 利用它可以实现类似计数器的功能,比如有一个任务A,它要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现这种功能了。

2、CountDownLatch 原理

CountDownLatch 是通过一个计数器来实现的,计数器的初始值为线程的数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值。每当一个线程完成了自己的任务后,计数器的值就会减1。当计数器值到达0时,它表示所有的线程已经完成了任务,然后在调用await()方法的等待线程就可以恢复执行任务。下面是CountDownLatch工作的伪代码:

//Main thread start
//Create CountDownLatch for N threads
//Create and start N threads
//Main thread wait on latch
//N threads completes there tasks are returns
//Main thread resume execution
3、CountDownLatch 用法

CountDownLatch 类只提供了一个构造器:

public CountDownLatch(int count) {  };  //参数count为计数值,实际上就是闭锁需要等待的线程数量。

下面这3个方法是CountDownLatch类中最重要的方法:

//调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行,除非线程被中断
public void await() throws InterruptedException { };   

//和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  

//调用此方法则将count值减1
public void countDown() { }; 

//得到当前的计数
public Long getCount();          
4、CountDownLatch 实例
public class CountDownLatchExample {

	public static void main(String[] args) {
		final CountDownLatch latch = new CountDownLatch(2);

		new Thread("T1") {
			public void run() {
				try {
					System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
					Thread.sleep(1000);
					System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
					latch.countDown();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			};
		}.start();

		new Thread("T2") {
			public void run() {
				try {
					System.out.println("子线程" + Thread.currentThread().getName() + "正在执行");
					Thread.sleep(3000);
					System.out.println("子线程" + Thread.currentThread().getName() + "执行完毕");
					latch.countDown();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			};
		}.start();

		try {
			System.out.println("等待2个子线程执行完毕...");
			latch.await();
			System.out.println("2个子线程已经执行完毕");
			System.out.println("继续执行主线程");
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}

测试结果: 在这里插入图片描述 说明:线程T1休眠了1秒,线程T2休眠了3秒,但T1执行完后,还要等T2线程执行完之后,主线程才能继续往下执行。

5、小节

与CountDownLatch的第一次交互是主线程等待其他线程。主线程必须在启动其他线程后立即调用CountDownLatch.await()方法。这样主线程的操作就会在这个方法上阻塞,直到其他线程完成各自的任务。

其他 N 个线程必须引用闭锁对象,因为他们需要通知 CountDownLatch 对象,他们已经完成了各自的任务。这种通知机制是通过 CountDownLatch.countDown()方法来完成的;每调用一次这个方法,在构造函数中初始化的count值就减1。所以当N个线程都调 用了这个方法,count的值等于0,然后主线程就能通过await()方法,恢复执行自己的任务。

三、CyclicBarrier

英文单词的字面意思:回环栅栏。通过它可以实现让一组线程等待至某个状态之后再全部同时执行。之所以叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。

1、CyclicBarrier 介绍

CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:

//参数parties指让多少个线程或者任务等待至barrier状态,参数barrierAction为当这些线程都达到barrier状态时会执行的内容,
//先执行barrierAction,然后在释放所有线程
public CyclicBarrier(int parties, Runnable barrierAction) {
}
 
public CyclicBarrier(int parties) {
}
2、方法介绍

CyclicBarrier中最重要的方法就是 await 方法,它有2个重载版本:

	public int await() throws InterruptedException, BrokenBarrierException { };
	
	public int await(long timeout, TimeUnit unit) throws InterruptedException,
											BrokenBarrierException,TimeoutException { };
											
	public void reset()

第一个版本比较常用,用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务;

第二个版本是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务。 下面举几个例子就明白了;

reset 方法将屏障重置为其初始状态。如果所有参与者目前都在屏障处等待,则它们将返回,同时抛出一个BrokenBarrierException。

3、CyclicBarrier实例

假若有若干个线程都要进行写数据操作,并且只有所有线程都完成写数据操作之后,这些线程才能继续做后面的事情。

(1)、一个参数的构造函数
public class CyclicBarrierExample {

	public static void main(String[] args) {

		final CyclicBarrier barrier = new CyclicBarrier(3);

		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
				try {
					Thread.sleep(2000); // 以睡眠来模拟写入数据操作
					System.out.println("线程" + Thread.currentThread().getName() 
							+ "写入数据完毕,等待其他线程写入完毕");
					barrier.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
				System.out.println("所有线程写入完毕,继续处理其他任务...");
			}
		}).start();

		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
				try {
					Thread.sleep(4000); // 以睡眠来模拟写入数据操作
					System.out.println("线程" + Thread.currentThread().getName() 
							+ "写入数据完毕,等待其他线程写入完毕");
					barrier.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
				System.out.println("所有线程写入完毕,继续处理其他任务...");
			}
		}).start();

		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
				try {
					Thread.sleep(6000); // 以睡眠来模拟写入数据操作
					System.out.println("线程" + Thread.currentThread().getName() 
							+ "写入数据完毕,等待其他线程写入完毕");
					barrier.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
				System.out.println("所有线程写入完毕,继续处理其他任务...");
			}
		}).start();
	}
}

测试结果: 在这里插入图片描述 结果说明:第一个线程休息2秒,第二个线程休息4秒,第三个线程休息6秒,但每个线程中的最后一句System.out.println,都会等到第三个线程休息完,才会同时执行。

(2)、二个参数的构造函数

从上面输出结果可以看出,每个写入线程执行完写数据操作之后,就在等待其他线程写入操作完毕。当所有线程写入操作完毕之后,所有线程就继续进行后续的操作了,如果说想在所有线程写入操作完之后,进行额外的其他操作可以为 CyclicBarrier 提供Runnable参数。

public class CyclicBarrierExample {

	public static void main(String[] args) {

		final CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
			@Override
			public void run() {
				System.out.println("所有线程执行完毕,回调此处");
				System.out.println("当前线程:" + Thread.currentThread().getName() + "");
			}
		});

		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
				try {
					Thread.sleep(2000); // 以睡眠来模拟写入数据操作
					System.out.println("线程" + Thread.currentThread().getName() 
							+ "写入数据完毕,等待其他线程写入完毕");
					barrier.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
				System.out.println("所有线程写入完毕,继续处理其他任务...");
			}
		}).start();

		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
				try {
					Thread.sleep(4000); // 以睡眠来模拟写入数据操作
					System.out.println("线程" + Thread.currentThread().getName() 
							+ "写入数据完毕,等待其他线程写入完毕");
					barrier.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
				System.out.println("所有线程写入完毕,继续处理其他任务...");
			}
		}).start();

		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
				try {
					Thread.sleep(6000); // 以睡眠来模拟写入数据操作
					System.out.println("线程" + Thread.currentThread().getName() 
							+ "写入数据完毕,等待其他线程写入完毕");
					barrier.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
				System.out.println("所有线程写入完毕,继续处理其他任务...");
			}
		}).start();
	}
}

测试结果: 在这里插入图片描述 结果说明:从结果可以看出,当三个线程都到达barrier状态后,会从三个线程中选择一个线程去执行Runnable。

(3)、为await指定时间
public class CyclicBarrierExample {

	public static void main(String[] args) {

		final CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
			@Override
			public void run() {
				System.out.println("所有线程执行完毕,回调此处");
				System.out.println("当前线程:" + Thread.currentThread().getName() + "");
			}
		});

		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
				try {
					Thread.sleep(2000); // 以睡眠来模拟写入数据操作
					System.out.println("线程" + Thread.currentThread().getName() 
							+ "写入数据完毕,等待其他线程写入完毕");
					try {
						barrier.await(2000, TimeUnit.MILLISECONDS);
					} catch (TimeoutException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
				System.out.println("所有线程写入完毕,继续处理其他任务...");
			}
		}).start();

		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
				try {
					Thread.sleep(4000); // 以睡眠来模拟写入数据操作
					System.out.println("线程" + Thread.currentThread().getName() 
							+ "写入数据完毕,等待其他线程写入完毕");
					barrier.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
				System.out.println("所有线程写入完毕,继续处理其他任务...");
			}
		}).start();

		new Thread(new Runnable() {
			@Override
			public void run() {
				System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
				try {
					Thread.sleep(6000); // 以睡眠来模拟写入数据操作
					System.out.println("线程" + Thread.currentThread().getName() 
							+ "写入数据完毕,等待其他线程写入完毕");
					barrier.await();
				} catch (InterruptedException e) {
					e.printStackTrace();
				} catch (BrokenBarrierException e) {
					e.printStackTrace();
				}
				System.out.println("所有线程写入完毕,继续处理其他任务...");
			}
		}).start();
	}
}

测试结果: 在这里插入图片描述 结果说明:第一个线程await()方法,只能等待2秒,该线程的时间到了,就会抛出一个TimeoutException异常,并继续执行后面的任务。

(4)、CyclicBarrier可重用

public class CyclicBarrierExample2 {

	public static void main(String[] args) {
		int N = 4;
		CyclicBarrier barrier = new CyclicBarrier(N);

		for (int i = 0; i             
关注
打赏
1657848381
查看更多评论
0.0588s