当我们向Executor提交一组任务,并且希望任务在完成后获得结果,此时可以考虑使用ExecutorCompletionService。 ExecutorCompletionService实现了CompletionService接口。ExecutorCompletionService将Executor和BlockingQueue功能融合在一起,使用它可以提交我们的Callable任务。这个任务委托给Executor执行,可以使用ExecutorCompletionService对象的take和poll方法获取结果。其中BlockingQueue负责保存计算完成的结果,其基本结构如下:
public class ExecutorCompletionService implements CompletionService {
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue completionQueue;
public ExecutorCompletionService(Executor executor) {
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue();
}
}
ExecutorCompletionService对象使用submit提交任务,任务同样返回一个Future。且在执行任务的时候,Task会被封装成RunnableFuture,RunnableFuture是一个接口,可以是FutureTask,也可以是其他实现了RunnableFuture的类型。进而封装成QueueingFuture交给executor执行。
public Future submit(Callable task) {
if (task == null) throw new NullPointerException();
RunnableFuture f = newTaskFor(task);
executor.execute(new QueueingFuture(f, completionQueue));
return f;
}
private RunnableFuture newTaskFor(Callable task) {
if (aes == null)
return new FutureTask(task);
else
return aes.newTaskFor(task);
}
QueueingFuture是什么呢?它是FutureTask的子类,主要是重写了FutureTask的done方法,将计算结果放到阻塞队列里。
private static class QueueingFuture extends FutureTask {
QueueingFuture(RunnableFuture task,
BlockingQueue completionQueue) {
super(task, null);
this.task = task;
this.completionQueue = completionQueue;
}
//在此处task可以看作是FutureTask.
private final Future task;
private final BlockingQueue completionQueue;
//将计算结果放到阻塞队列里
protected void done() { completionQueue.add(task); }
}
注意我们放进队列的是FutureTask.我们从ExecutorCompletionService的take方法里获取到的是一个FutureTask对象,如果想要获取到最终结果,需要调用FutureTask的get方法:
Result r = ecs.take().get();
所以我们可以使用如下代码来使用CompletionService,创建ExecutorCompletionService对象;使用Executor提交任务;使用ExecutorCompletionService的take方法获取FutureTask,然后调用FutureTask的get方法获取结果。假设你有一些任务希望并发的执行,并且向往将结果交给某方法执行,则可以用如下代码来实现:
void solve(Executor e, Collection solvers) {
CompletionService ecs
= new ExecutorCompletionService(e);
//循环提交每一个任务
for (Callable s : solvers)
ecs.submit(s);
int n = solvers.size();
for (int i = 0; i
关注
打赏
- Flutter之Widget构建过程详解
- Flutter之Widget 更新机制原理浅析
- 从源码角度分析android事件分发处理机制
- android事件拦截处理机制详解
- android 获取IP地址
- android 获取生肖和星座
- Android TextView setCompoundDrawables设置图片不显示解决方案
- Android Stuido集成LeakCanary编译失败TransformException、transformClassesWithDexBuilderForDebug
- Flutter实战之go_router路由组件入门指南
- HarmonyOS鸿蒙学习笔记(9)Navigator组件实现页面路由跳转