您当前的位置: 首页 > 

顧棟

暂无认证

  • 0浏览

    0关注

    227博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

【JUC系列】Executor框架之FutureTask

顧棟 发布时间:2022-06-17 06:00:00 ,浏览量:0

JUC系列Executor框架之FutureTask

JDK 版本1.8

文章目录
  • JUC系列Executor框架之FutureTask
    • 使用示例
      • 示例一
      • 示例二
    • 源码分析
      • 类图
      • Callable接口
      • Future接口
      • RunnableFuture接口
      • 内部类WaitNode
      • 成员变量
        • 任务状态
          • 状态关系图
      • 构造函数
      • 核心方法
        • 执行任务
        • 获取任务执行结果
        • 取消任务执行
可取消的异步计算。 此类提供 Future 的基本实现,包括启动和取消计算、查询计算是否完成以及检索计算结果的方法。 只有在计算完成后才能检索结果; 如果计算尚未完成,get 方法将阻塞。 一旦计算完成,就不能重新开始或取消计算(除非使用 runAndReset 调用计算)。 FutureTask 可用于包装 Callable 或 Runnable 对象。 因为 FutureTask 实现了 Runnable,所以一个 FutureTask 可以提交给一个 Executor 执行。FutureTask 的线程安全由CAS来保证。

使用示例 示例一
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;

public class FutureTaskDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        FutureTask futureTask = new FutureTask(new Task());
        executor.submit(futureTask);
        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] result = " + futureTask.get() + ".");
        executor.shutdown();
    }

    static class Task implements Callable {
        @Override
        public Integer call() throws Exception {
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] is running. ");

            return 0;
        }
    }
}

执行结果

[16:13:02--pool-1-thread-1] is running. 
[16:13:02--main] result = 0.
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class FutureTaskDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        ExecutorService executor = new ThreadPoolExecutor(10
                , 256
                , 0L
                , TimeUnit.MILLISECONDS
                , new ArrayBlockingQueue(500)
                , new UserThreadFactory("X")
        );

        FutureTask futureTask = new FutureTask(new Task(String.valueOf(1)));
        executor.submit(futureTask);
        boolean flag = true;
        while (flag) {
            if (futureTask.isDone()) {
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(System.currentTimeMillis()) + "--" + Thread.currentThread().getName() + "] result = " + futureTask.get() + ".");
                flag = false;
            }
        }
        executor.shutdown();
    }

    static class Task implements Callable {
        private final String name;

        Task(String name) {
            this.name = name;
        }

        @Override
        public Integer call() throws Exception {
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] task:" + name + " is running.");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] task:" + name + " is completed.");
            return 2;
        }
    }

    public static class UserThreadFactory implements ThreadFactory {
        private final String namePrefix;
        private final AtomicInteger nextId = new AtomicInteger(1);

        UserThreadFactory(String whatFeatureOfGroup) {
            this.namePrefix = "From UserThreadFactory's " + whatFeatureOfGroup + "-Worker-";
        }

        @Override
        public Thread newThread(Runnable task) {
            String name = namePrefix + nextId.getAndIncrement();
            return new Thread(null, task, name, 0);
        }
    }
}

执行结果

[19:18:05:005--From UserThreadFactory's X-Worker-1] task:1 is running.
[19:18:07:007--From UserThreadFactory's X-Worker-1] task:1 is completed.
[19:18:07:007--main] result = 2.
示例二
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.*;

public class FutureTaskDemo {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask futureTask2 = new FutureTask(new Task());
        Thread thread = new Thread(futureTask2);
        thread.setName("Task thread");
        thread.start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] is running. ");

        if (!futureTask2.isDone()) {
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] Task is not done. ");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] Task is done. ");
        }
        int result = 0;
        try {
            result = futureTask2.get();
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] result = " + result + ".");
    }

    static class Task implements Callable {
        @Override
        public Integer call() throws Exception {
            System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] is running. ");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 2;
        }
    }
}

执行结果

[16:19:30--Task thread] is running. 
[16:19:30--main] is running. 
[16:19:30--main] Task is done. 
[16:19:30--main] result = 2.
源码分析 类图

在这里插入图片描述

Callable接口

Callable是个泛型接口,泛型V就是要call()方法返回的类型。对比Runnable接口,Runnable不会返回数据也不能抛出异常。

@FunctionalInterface
public interface Runnable {
    public abstract void run();
}
Future接口

Future接口代表异步计算的结果,通过Future接口提供的方法可以查看异步计算是否执行完成,或者等待执行结果并获取执行结果,同时还可以取消执行。Future接口的定义如下:

public interface Future {
	/** 
     * cancel()方法用来取消异步任务的执行。
     * 如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回false。
     * 如果任务还没有被执行,则会返回true并且异步任务不会被执行。
     * 如果任务已经开始执行了但是还没有执行完成,若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断任务执行线程。
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * 判断任务是否被取消,如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回true,否则返回false。
     */
    boolean isCancelled();

    /**
     * 判断任务是否已经完成,如果完成则返回true,否则返回false。
     * 需要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回true。
     */
    boolean isDone();

    /**
     * 获取任务执行结果,如果任务还没完成则会阻塞等待直到任务执行完成。
     * 如果任务被取消则会抛出CancellationException异常,
     * 如果任务执行过程发生异常则会抛出ExecutionException异常,
     * 如果阻塞等待过程中被中断则会抛出InterruptedException异常。
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 带超时时间的get()版本,如果阻塞等待过程中超时则会抛出TimeoutException异常。
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
RunnableFuture接口

将此 Future 设置为其计算结果,除非它已被取消。

public interface RunnableFuture extends Runnable, Future {
    void run();
}
内部类WaitNode

一个单向的链表结构,用来表示排队的线程。

static final class WaitNode {
    // 当前线程
    volatile Thread thread;
    // 下一个等待结点
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}
成员变量
    // 内部持有的callable任务,运行完毕后置空
	private Callable callable;
	// 从get()中返回的结果或抛出的异常
    private Object outcome;
	// 执行callable的线程
    private volatile Thread runner;
	// 使用Treiber栈保存等待线程
    private volatile WaitNode waiters;
任务状态

使用一个volatile修饰的int型变量state作为任务状态,只要有任何一个线程修改了这个变量,那么其他所有的线程都会知道最新的值。

	// 任务状态
	private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;
  • NEW:表示是个新的任务或者还没被执行完的任务。这是初始状态。

  • COMPLETING:任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段(outcome字段用来保存任务执行结果,如果发生异常,则用来保存异常原因)的时候,状态会从NEW变更到COMPLETING。但是这个状态会时间会比较短,属于中间状态。

  • NORMAL:任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL。这是一个最终态。

  • EXCEPTIONAL:任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL。这是一个最终态。

  • CANCELLED:任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从NEW转化为CANCELLED状态。这是一个最终态。

  • INTERRUPTING: 任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING。这是一个中间状态。

  • INTERRUPTED:调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED。这是一个最终态。 有一点需要注意的是,所有值大于COMPLETING的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)。

状态关系图

在这里插入图片描述

构造函数

这个构造函数会把传入的Callable变量保存在this.callable字段中,该字段定义为private Callable callable;用来保存底层的调用,在被执行完成以后会指向null,接着会初始化state字段为NEW。

public FutureTask(Callable callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

这个构造函数会把传入的Runnable封装成一个Callable对象保存在callable字段中,同时如果任务执行成功的话就会返回传入的result。这种情况下如果不需要返回值的话可以传入一个null。 Executors.callable(runnable, result)方法是通过适配器模式将Runnable转换成了Callable。

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

Executors类下的方法和内部类

public static  Callable callable(Runnable task, T result) {
    if (task == null)
        throw new NullPointerException();
    return new RunnableAdapter(task, result);
}
static final class RunnableAdapter implements Callable {
    final Runnable task;
    final T result;
    RunnableAdapter(Runnable task, T result) {
        this.task = task;
        this.result = result;
    }
    public T call() {
        task.run();
        return result;
    }
}
核心方法 方法名描述boolean cancel(boolean mayInterruptIfRunning)尝试取消任务。如果任务已经完成或已经被取消,此操作会失败。V get() throws InterruptedException, ExecutionExceptionFutureTask 通过get()方法获取任务执行结果。如果任务处于未完成的状态(state = INTERRUPTING) // 在中断过程和中断后,确保来自可能的 cancel(true) 的任何中断仅在运行或 runAndReset 时传递给任务。 handlePossibleCancellationInterrupt(s); } }
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 异常的时候outcome得到的是异常信息
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        // 成功执行当得到执行结果
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

执行收尾方法,唤醒等待线程

private void finishCompletion() {
    // assert state > COMPLETING;
    // 如果等待的线程结点不为null
    for (WaitNode q; (q = waiters) != null;) {
        // 清空这个Futuretask的waiters
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 唤醒等待的线程,并解开waiters链表
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    // 在唤醒等待线程后执行,可以自定义扩展
    done();

    callable = null;        // to reduce footprint
}

对中断的情况进行处理

private void handlePossibleCancellationInterrupt(int s) {
    // It is possible for our interrupter to stall before getting a
    // chance to interrupt us.  Let's spin-wait patiently.
    // 在中断者中断线程之前可能会延迟,所以我们只需要让出CPU时间片自旋等待
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt

    // assert state == INTERRUPTED;

    // We want to clear any interrupt we may have received from
    // cancel(true).  However, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // Thread.interrupted();
}
获取任务执行结果
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    // 如果状态是NEW或则COMPLETING,则需要等待执行结果
    if (s  COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        // 3.若任务还在COMPLETING中,采用线程让步,将当前线程的CPU资源让出来,重新循环。
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        // 4.此时说明 状态应该是NEW,且 尚未有等待节点,创建一个首个等待节点
        else if (q == null)
            q = new WaitNode();
        // 队列为空 - CAS对waiters的值进行更新,头插法
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        // 5.存在超时则 计算等待超时,若已经超时移除等待节点,返回;若未超时,则采用阻塞超时限制的方式阻塞线程
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos = CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}
取消任务执行

mayInterruptIfRunning:true代表正在中断;false代表以及取消了。只能对状态NEW的进行取消,不然返回false。

public boolean cancel(boolean mayInterruptIfRunning) {
    // 1.若state==NEW,会更根据mayInterruptIfRunning,尝试将线程改为INTERRUPTING或CANCELLED,
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    
    //执行至此代表原来的状态为NEW以及以及改为INTERRUPTING或CANCELLED
    try {    // in case call to interrupt throws exception
        // 若是中断,则需要对线程进行中断,并将状态改为INTERRUPTED
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        // 执行收尾方法,唤醒等待线程
        finishCompletion();
    }
    return true;
}

参考 https://www.pdai.tech/md/java/thread/java-thread-x-juc-executor-FutureTask.html#futuretask%E7%A4%BA%E4%BE%8B

关注
打赏
1663402667
查看更多评论
立即登录/注册

微信扫码登录

0.0401s