您当前的位置: 首页 > 

顧棟

暂无认证

  • 0浏览

    0关注

    227博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

【JUC系列】Executor框架之CompletionService

顧棟 发布时间:2022-06-20 21:00:00 ,浏览量:0

【JUC系列】Executor框架之CompletionService

文章目录
  • 【JUC系列】Executor框架之CompletionService
    • CompletionService
    • ExecutorCompletionService
      • 成员变量
      • 内部类QueueingFuture
      • 构造函数
      • 任务执行
    • 使用案例
      • 场景一:
      • 场景二:

需要优先阅读【JUC系列】Executor框架之FutureTask

CompletionService

这是一个接口,提供一种服务,它将新的异步任务的生产与已完成任务的结果的消费分离。生产者提交任务以供执行。消费者接受已完成的任务并按照他们完成的顺序处理他们的结果。

例如,CompletionService 可用于管理异步 I/O,其中执行读取的任务在程序或系统的一个部分中提交,然后在读取完成时在程序的不同部分执行操作,可能在与他们要求的顺序不同。通常,CompletionService 依赖一个单独的 Executor 来实际执行任务,在这种情况下,CompletionService 只管理一个内部完成队列。 ExecutorCompletionService 类提供了这种方法的实现。

内存一致性效果:在将任务提交到 CompletionService 之前线程中的操作发生在该任务所采取的操作之前,这反过来又发生在从相应的 take() 成功返回之后的操作。

接口提供了以下方法

方法名描述Future submit(Callable task)提交一个返回值的任务以供执行,并返回一个表示该任务待处理结果的 Future。 完成后,可以采取或轮询此任务。Future submit(Runnable task, V result)提交 Runnable 任务以执行并返回代表该任务的 Future。 完成后,可以take或poll此任务。Future take() throws InterruptedException检索并删除代表下一个已完成任务的 Future,如果还没有,则等待。Future poll()检索并删除表示下一个已完成任务的 Future,如果不存在,则返回 null。Future poll(long timeout, TimeUnit unit) throws InterruptedException检索并删除代表下一个已完成任务的 Future,如果还没有,则在必要时等待指定的等待时间。 ExecutorCompletionService

ExecutorCompletionService实现了CompletionService。

一个 CompletionService,它使用提供的 Executor 来执行任务。 此类安排提交的任务在完成后放置在使用 take 可访问的队列中。 该类足够轻量级,适合在处理任务组时临时使用。

成员变量
// 执行任务的线程池
private final Executor executor;
// ???
private final AbstractExecutorService aes;
// 任务完成会存放在该阻塞队列中
private final BlockingQueue completionQueue;
内部类QueueingFuture

继承了FutureTask

private class QueueingFuture extends FutureTask {
    QueueingFuture(RunnableFuture task) {
        super(task, null);
        this.task = task;
    }
    // 执行成功,将任务添加到completionQueue队列中。主要在FutureTask执行finishCompletion()时,调用done。
    protected void done() { completionQueue.add(task); }
    private final Future task;
}
构造函数

参数executor:传入的线程池,用来执行任务;默认的队列是LinkedBlockingQueue。

public ExecutorCompletionService(Executor executor) {
    if (executor == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = new LinkedBlockingQueue();
}

参数executor:传入的线程池,用来执行任务;

参数completionQueue:用来记录执行结果的阻塞列表

public ExecutorCompletionService(Executor executor,
                                 BlockingQueue completionQueue) {
    if (executor == null || completionQueue == null)
        throw new NullPointerException();
    this.executor = executor;
    this.aes = (executor instanceof AbstractExecutorService) ?
        (AbstractExecutorService) executor : null;
    this.completionQueue = completionQueue;
}
任务执行
public Future submit(Callable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture f = newTaskFor(task);
    // 将任务转换成QueueingFuture
    executor.execute(new QueueingFuture(f));
    return f;
}
public Future submit(Runnable task, V result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture f = newTaskFor(task, result);
    // 将任务转换成QueueingFuture 
    executor.execute(new QueueingFuture(f));
    return f;
}

ExecutorCompletionService的newTaskFor,将Callable或Runnable任务转成FutureTask。

若线程池不为AbstractExecutorService则,直接new FutureTask;若aes不为null,则调用AbstractExecutorService的newTaskFor。实际也是通过new FutureTask实现。

private RunnableFuture newTaskFor(Callable task) {
    if (aes == null)
        return new FutureTask(task);
    else
        return aes.newTaskFor(task);
}

private RunnableFuture newTaskFor(Runnable task, V result) {
    if (aes == null)
        return new FutureTask(task, result);
    else
        return aes.newTaskFor(task, result);
}

AbstractExecutorService的newTaskFor

protected  RunnableFuture newTaskFor(Runnable runnable, T value) {
    return new FutureTask(runnable, value);
}

protected  RunnableFuture newTaskFor(Callable callable) {
    return new FutureTask(callable);
}
使用案例 场景一:

假设您有一组求解某个问题的求解器,每个求解器返回某种类型的值 Result,并希望同时运行它们,处理每个返回非空值的结果,在某些方法中使用(Result r)。 你可以这样写:

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.*;

public class ExecutorCompletionServiceDemo {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                5,
                10,
                3,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(3),
                new ThreadPoolExecutor.AbortPolicy());

        CompletionService completionService = new ExecutorCompletionService(threadPoolExecutor);
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] is beginning. ");

        List futures = new ArrayList(3);
        futures.add(completionService.submit(() -> {
            Thread.sleep(3000);
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
            return 1;
        }));
        futures.add(completionService.submit(() -> {
            Thread.sleep(2000);
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
            return 2;
        }));
        futures.add(completionService.submit(() -> {
            Thread.sleep(1000);
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
            return 3;
        }));

        Integer r = 0;
        try {
            for (int i = 0; i  {
            Thread.sleep(3000);
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
            return 1;
        }));
        futures.add(completionService.submit(() -> {
            Thread.sleep(2000);
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");

            return 2;
        }));
        futures.add(completionService.submit(() -> {
            Thread.sleep(1000);
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] sleep is over. ");
            return 3;
        }));

        Integer r = 0;
        try {
            for (int i = 0; i             
关注
打赏
1663402667
查看更多评论
0.0511s