您当前的位置: 首页 >  Java

郭梧悠

暂无认证

  • 0浏览

    0关注

    402博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

java并发编程实战读书笔记 ExecutorCompletionService

郭梧悠 发布时间:2021-11-01 10:35:13 ,浏览量:0

当我们向Executor提交一组任务,并且希望任务在完成后获得结果,此时可以考虑使用ExecutorCompletionService。 ExecutorCompletionService实现了CompletionService接口。ExecutorCompletionService将Executor和BlockingQueue功能融合在一起,使用它可以提交我们的Callable任务。这个任务委托给Executor执行,可以使用ExecutorCompletionService对象的take和poll方法获取结果。其中BlockingQueue负责保存计算完成的结果,其基本结构如下:

public class ExecutorCompletionService implements CompletionService {
    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue completionQueue;

      public ExecutorCompletionService(Executor executor) {
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue();
    }
}    

ExecutorCompletionService对象使用submit提交任务,任务同样返回一个Future。且在执行任务的时候,Task会被封装成RunnableFuture,RunnableFuture是一个接口,可以是FutureTask,也可以是其他实现了RunnableFuture的类型。进而封装成QueueingFuture交给executor执行。


 public Future submit(Callable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture f = newTaskFor(task);
        executor.execute(new QueueingFuture(f, completionQueue));
        return f;
    }

  private RunnableFuture newTaskFor(Callable task) {
        if (aes == null)
            return new FutureTask(task);
        else
            return aes.newTaskFor(task);
    }

QueueingFuture是什么呢?它是FutureTask的子类,主要是重写了FutureTask的done方法,将计算结果放到阻塞队列里。

   private static class QueueingFuture extends FutureTask {
        QueueingFuture(RunnableFuture task,
                       BlockingQueue completionQueue) {
            super(task, null);
            this.task = task;
            this.completionQueue = completionQueue;
        }
        //在此处task可以看作是FutureTask.
        private final Future task;
        private final BlockingQueue completionQueue;
        //将计算结果放到阻塞队列里
        protected void done() { completionQueue.add(task); }
    }

注意我们放进队列的是FutureTask.我们从ExecutorCompletionService的take方法里获取到的是一个FutureTask对象,如果想要获取到最终结果,需要调用FutureTask的get方法:

Result r = ecs.take().get();

所以我们可以使用如下代码来使用CompletionService,创建ExecutorCompletionService对象;使用Executor提交任务;使用ExecutorCompletionService的take方法获取FutureTask,然后调用FutureTask的get方法获取结果。假设你有一些任务希望并发的执行,并且向往将结果交给某方法执行,则可以用如下代码来实现:

void solve(Executor e, Collection solvers) {
    
     CompletionService ecs
       = new ExecutorCompletionService(e);
    //循环提交每一个任务   
    for (Callable s : solvers)
      ecs.submit(s);
      
    int n = solvers.size();
    for (int i = 0; i             
关注
打赏
1663674776
查看更多评论
0.0428s