- 【JUC系列】Executor框架之CompletionService
- CompletionService
- ExecutorCompletionService
- 成员变量
- 内部类QueueingFuture
- 构造函数
- 任务执行
- 使用案例
- 场景一:
- 场景二:
需要优先阅读【JUC系列】Executor框架之FutureTask
CompletionService这是一个接口,提供一种服务,它将新的异步任务的生产与已完成任务的结果的消费分离。生产者提交任务以供执行。消费者接受已完成的任务并按照他们完成的顺序处理他们的结果。
例如,CompletionService 可用于管理异步 I/O,其中执行读取的任务在程序或系统的一个部分中提交,然后在读取完成时在程序的不同部分执行操作,可能在与他们要求的顺序不同。通常,CompletionService 依赖一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。 ExecutorCompletionService 类提供了这种方法的实现。
内存一致性效果:在将任务提交到 CompletionService 之前线程中的操作发生在该任务所采取的操作之前,这反过来又发生在从相应的 take() 成功返回之后的操作。
接口提供了以下方法
方法名描述Future submit(Callable task)提交一个返回值的任务以供执行,并返回一个表示该任务待处理结果的 Future。 完成后,可以采取或轮询此任务。Future submit(Runnable task, V result)提交 Runnable 任务以执行并返回代表该任务的 Future。 完成后,可以take或poll此任务。Future take() throws InterruptedException检索并删除代表下一个已完成任务的 Future,如果还没有,则等待。Future poll()检索并删除表示下一个已完成任务的 Future,如果不存在,则返回 null。Future poll(long timeout, TimeUnit unit) throws InterruptedException检索并删除代表下一个已完成任务的 Future,如果还没有,则在必要时等待指定的等待时间。 ExecutorCompletionServiceExecutorCompletionService实现了CompletionService。
一个 CompletionService,它使用提供的 Executor 来执行任务。 此类安排提交的任务在完成后放置在使用 take 可访问的队列中。 该类足够轻量级,适合在处理任务组时临时使用。
成员变量// 执行任务的线程池
private final Executor executor;
// ???
private final AbstractExecutorService aes;
// 任务完成会存放在该阻塞队列中
private final BlockingQueue completionQueue;
内部类QueueingFuture
继承了FutureTask
private class QueueingFuture extends FutureTask {
QueueingFuture(RunnableFuture task) {
super(task, null);
this.task = task;
}
// 执行成功,将任务添加到completionQueue队列中。主要在FutureTask执行finishCompletion()时,调用done。
protected void done() { completionQueue.add(task); }
private final Future task;
}
构造函数
参数executor:传入的线程池,用来执行任务;默认的队列是LinkedBlockingQueue。
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue();
}
参数executor:传入的线程池,用来执行任务;
参数completionQueue:用来记录执行结果的阻塞列表
public ExecutorCompletionService(Executor executor,
BlockingQueue completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
任务执行
public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFuture f = newTaskFor(task);
// 将任务转换成QueueingFuture
executor.execute(new QueueingFuture(f));
return f;
}
public Future submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture f = newTaskFor(task, result);
// 将任务转换成QueueingFuture
executor.execute(new QueueingFuture(f));
return f;
}
ExecutorCompletionService的newTaskFor,将Callable或Runnable任务转成FutureTask。
若线程池不为AbstractExecutorService则,直接new FutureTask;若aes不为null,则调用AbstractExecutorService的newTaskFor。实际也是通过new FutureTask实现。
private RunnableFuture newTaskFor(Callable task) {
if (aes == null)
return new FutureTask(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask(task, result);
else
return aes.newTaskFor(task, result);
}
AbstractExecutorService的newTaskFor
protected RunnableFuture newTaskFor(Runnable runnable, T value) {
return new FutureTask(runnable, value);
}
protected RunnableFuture newTaskFor(Callable callable) {
return new FutureTask(callable);
}
使用案例
场景一:
假设您有一组求解某个问题的求解器,每个求解器返回某种类型的值 Result,并希望同时运行它们,处理每个返回非空值的结果,在某些方法中使用(Result r)。 你可以这样写:
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;
public class ExecutorCompletionServiceDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,
10,
3,
TimeUnit.SECONDS,
new ArrayBlockingQueue(3),
new ThreadPoolExecutor.AbortPolicy());
CompletionService completionService = new ExecutorCompletionService(threadPoolExecutor);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] is beginning. ");
List futures = new ArrayList(3);
futures.add(completionService.submit(() -> {
Thread.sleep(3000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 1;
}));
futures.add(completionService.submit(() -> {
Thread.sleep(2000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 2;
}));
futures.add(completionService.submit(() -> {
Thread.sleep(1000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 3;
}));
Integer r = 0;
try {
for (int i = 0; i {
Thread.sleep(3000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 1;
}));
futures.add(completionService.submit(() -> {
Thread.sleep(2000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 2;
}));
futures.add(completionService.submit(() -> {
Thread.sleep(1000);
System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
return 3;
}));
Integer r = 0;
try {
for (int i = 0; i
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?