您当前的位置: 首页 > 

【JUC系列】Executor框架之FutureTask

发布时间:2022-06-17 06:00:00 ,浏览量:6

JUC系列Executor框架之FutureTask

JDK 版本1.8

文章目录
  • JUC系列Executor框架之FutureTask
    • 使用示例
      • 示例一
      • 示例二
    • 源码分析
      • 类图
      • Callable接口
      • Future接口
      • RunnableFuture接口
      • 内部类WaitNode
      • 成员变量
        • 任务状态
          • 状态关系图
      • 构造函数
      • 核心方法
        • 执行任务
        • 获取任务执行结果
        • 取消任务执行
可取消的异步计算。 此类提供 Future 的基本实现,包括启动和取消计算、查询计算是否完成以及检索计算结果的方法。 只有在计算完成后才能检索结果; 如果计算尚未完成,get 方法将阻塞。 一旦计算完成,就不能重新开始或取消计算(除非使用 runAndReset 调用计算)。 FutureTask 可用于包装 Callable 或 Runnable 对象。 因为 FutureTask 实现了 Runnable,所以一个 FutureTask 可以提交给一个 Executor 执行。FutureTask 的线程安全由CAS来保证。 使用示例 示例一
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.*; public class FutureTaskDemo<T> { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = Executors.newCachedThreadPool(); FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task()); executor.submit(futureTask); System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] result = " + futureTask.get() + "."); executor.shutdown(); } static class Task implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] is running. "); return 0; } } } 

执行结果

[16:13:02--pool-1-thread-1] is running. [16:13:02--main] result = 0. 
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class FutureTaskDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService executor = new ThreadPoolExecutor(10 , 256 , 0L , TimeUnit.MILLISECONDS , new ArrayBlockingQueue<>(500) , new UserThreadFactory("X") ); FutureTask<Integer> futureTask = new FutureTask<>(new Task(String.valueOf(1))); executor.submit(futureTask); boolean flag = true; while (flag) { if (futureTask.isDone()) { System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(System.currentTimeMillis()) + "--" + Thread.currentThread().getName() + "] result = " + futureTask.get() + "."); flag = false; } } executor.shutdown(); } static class Task implements Callable<Integer> { private final String name; Task(String name) { this.name = name; } @Override public Integer call() throws Exception { System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] task:" + name + " is running."); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("[" + new SimpleDateFormat("HH:mm:ss.SSS").format(new Date()) + "--" + Thread.currentThread().getName() + "] task:" + name + " is completed."); return 2; } } public static class UserThreadFactory implements ThreadFactory { private final String namePrefix; private final AtomicInteger nextId = new AtomicInteger(1); UserThreadFactory(String whatFeatureOfGroup) { this.namePrefix = "From UserThreadFactory's " + whatFeatureOfGroup + "-Worker-"; } @Override public Thread newThread(Runnable task) { String name = namePrefix + nextId.getAndIncrement(); return new Thread(null, task, name, 0); } } } 

执行结果

[19:18:05:005--From UserThreadFactory's X-Worker-1] task:1 is running. [19:18:07:007--From UserThreadFactory's X-Worker-1] task:1 is completed. [19:18:07:007--main] result = 2. 
示例二
import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.*; public class FutureTaskDemo<T> { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask2 = new FutureTask<>(new Task()); Thread thread = new Thread(futureTask2); thread.setName("Task thread"); thread.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] is running. "); if (!futureTask2.isDone()) { System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] Task is not done. "); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] Task is done. "); } int result = 0; try { result = futureTask2.get(); } catch (Exception e) { e.printStackTrace(); } System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] result = " + result + "."); } static class Task implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "--" + Thread.currentThread().getName() + "] is running. "); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return 2; } } } 

执行结果

[16:19:30--Task thread] is running. [16:19:30--main] is running. [16:19:30--main] Task is done. [16:19:30--main] result = 2. 
源码分析 类图

在这里插入图片描述

Callable接口

Callable是个泛型接口,泛型V就是要call()方法返回的类型。对比Runnable接口,Runnable不会返回数据也不能抛出异常。

@FunctionalInterface public interface Runnable { public abstract void run(); } 
Future接口

Future接口代表异步计算的结果,通过Future接口提供的方法可以查看异步计算是否执行完成,或者等待执行结果并获取执行结果,同时还可以取消执行。Future接口的定义如下:

public interface Future<V> { /** 
     * cancel()方法用来取消异步任务的执行。
     * 如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回false。
     * 如果任务还没有被执行,则会返回true并且异步任务不会被执行。
     * 如果任务已经开始执行了但是还没有执行完成,若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断任务执行线程。
     */ boolean cancel(boolean mayInterruptIfRunning); /**
     * 判断任务是否被取消,如果任务在结束(正常执行结束或者执行异常结束)前被取消则返回true,否则返回false。
     */ boolean isCancelled(); /**
     * 判断任务是否已经完成,如果完成则返回true,否则返回false。
     * 需要注意的是:任务执行过程中发生异常、任务被取消也属于任务已完成,也会返回true。
     */ boolean isDone(); /**
     * 获取任务执行结果,如果任务还没完成则会阻塞等待直到任务执行完成。
     * 如果任务被取消则会抛出CancellationException异常,
     * 如果任务执行过程发生异常则会抛出ExecutionException异常,
     * 如果阻塞等待过程中被中断则会抛出InterruptedException异常。
     */ V get() throws InterruptedException, ExecutionException; /**
     * 带超时时间的get()版本,如果阻塞等待过程中超时则会抛出TimeoutException异常。
     */ V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } 
RunnableFuture接口

将此 Future 设置为其计算结果,除非它已被取消。

public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); } 
内部类WaitNode

一个单向的链表结构,用来表示排队的线程。

static final class WaitNode { // 当前线程 volatile Thread thread; // 下一个等待结点 volatile WaitNode next; WaitNode() { thread = Thread.currentThread(); } } 
成员变量
// 内部持有的callable任务,运行完毕后置空 private Callable<V> callable; // 从get()中返回的结果或抛出的异常 private Object outcome; // 执行callable的线程 private volatile Thread runner; // 使用Treiber栈保存等待线程 private volatile WaitNode waiters; 
任务状态

使用一个volatile修饰的int型变量state作为任务状态,只要有任何一个线程修改了这个变量,那么其他所有的线程都会知道最新的值。

// 任务状态 private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6; 
  • NEW:表示是个新的任务或者还没被执行完的任务。这是初始状态。

  • COMPLETING:任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段(outcome字段用来保存任务执行结果,如果发生异常,则用来保存异常原因)的时候,状态会从NEW变更到COMPLETING。但是这个状态会时间会比较短,属于中间状态。

  • NORMAL:任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL。这是一个最终态。

  • EXCEPTIONAL:任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL。这是一个最终态。

  • CANCELLED:任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从NEW转化为CANCELLED状态。这是一个最终态。

  • INTERRUPTING: 任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING。这是一个中间状态。

  • INTERRUPTED:调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED。这是一个最终态。 有一点需要注意的是,所有值大于COMPLETING的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消)。

状态关系图

在这里插入图片描述

构造函数

这个构造函数会把传入的Callable变量保存在this.callable字段中,该字段定义为private Callablecallable;用来保存底层的调用,在被执行完成以后会指向null,接着会初始化state字段为NEW。

public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } 

这个构造函数会把传入的Runnable封装成一个Callable对象保存在callable字段中,同时如果任务执行成功的话就会返回传入的result。这种情况下如果不需要返回值的话可以传入一个null。Executors.callable(runnable, result)方法是通过适配器模式将Runnable转换成了Callable。

public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } 

Executors类下的方法和内部类

public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } 
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } } 
核心方法 方法名 描述 boolean cancel(boolean mayInterruptIfRunning) 尝试取消任务。如果任务已经完成或已经被取消,此操作会失败。 V get() throws InterruptedException, ExecutionException FutureTask 通过get()方法获取任务执行结果。如果任务处于未完成的状态(state <= COMPLETING),就调用awaitDone方法(后面单独讲解)等待任务完成。任务完成后,通过report方法获取执行结果或抛出执行期间的异常。 void run() 运行任务 执行任务
public void run() { // 当FutureTask的状态不是NEW 或者无法将当前线程变为任务的执行线程无法继续执行run方法,返回。 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { // 获取需要执行的任务 Callable<V> c = callable; if (c != null && state == NEW) { // 说明此时任务的状态是NEW且执行任务的线程是当前线程 V result; // 任务是否被执行的标识。true代表以及成功执行 false代表执行出现了异常 boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 执行异常 setException(ex); } if (ran) // 成功执行,设置结果 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) // 在中断过程和中断后,确保来自可能的 cancel(true) 的任何中断仅在运行或 runAndReset 时传递给任务。 handlePossibleCancellationInterrupt(s); } } 
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 异常的时候outcome得到的是异常信息 outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } } 
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 成功执行当得到执行结果 outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } } 

执行收尾方法,唤醒等待线程

private void finishCompletion() { // assert state > COMPLETING; // 如果等待的线程结点不为null for (WaitNode q; (q = waiters) != null;) { // 清空这个Futuretask的waiters if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { // 唤醒等待的线程,并解开waiters链表 for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } // 在唤醒等待线程后执行,可以自定义扩展 done(); callable = null; // to reduce footprint } 

对中断的情况进行处理

private void handlePossibleCancellationInterrupt(int s) { // It is possible for our interrupter to stall before getting a // chance to interrupt us.  Let's spin-wait patiently. // 在中断者中断线程之前可能会延迟,所以我们只需要让出CPU时间片自旋等待 if (s == INTERRUPTING) while (state == INTERRUPTING) Thread.yield(); // wait out pending interrupt // assert state == INTERRUPTED; // We want to clear any interrupt we may have received from // cancel(true).  However, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // Thread.interrupted(); } 
获取任务执行结果
public V get() throws InterruptedException, ExecutionException { int s = state; // 如果状态是NEW或则COMPLETING,则需要等待执行结果 if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); } 

timed是都存在超时,true 存在等待超时,false不存在等待超时

nanos:超时时长,毫秒。

private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 最终时间 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; // 自旋 for (;;) { // 1.若线程被中断了,则需要排除等待节点,抛出中断异常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } // 2.再次检查状态,若状态已经跳过COMPLETING,则返回最新状态 int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 3.若任务还在COMPLETING中,采用线程让步,将当前线程的CPU资源让出来,重新循环。 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 4.此时说明 状态应该是NEW,且 尚未有等待节点,创建一个首个等待节点 else if (q == null) q = new WaitNode(); // 队列为空 - CAS对waiters的值进行更新,头插法 else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 5.存在超时则 计算等待超时,若已经超时移除等待节点,返回;若未超时,则采用阻塞超时限制的方式阻塞线程 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else // 阻塞线程 LockSupport.park(this); } } 

尝试取消链接超时或中断的等待节点以避免积累垃圾。 内部节点在没有 CAS 的情况下简单地解开,因为如果它们被释放者遍历是无害的。 为了避免从已删除的节点中解开的影响,如果出现明显的竞争,将重新遍历列表。 当有很多节点时,这很慢,但我们不希望列表足够长以超过开销更高的方案。

private void removeWaiter(WaitNode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removeWaiter race // 从waiters的首结点开始遍历,移除node for (WaitNode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; // 若前驱结点中没有null则需要检查冲突,从头遍历 if (pred.thread == null) // check for race continue retry; } // CAS操作失败,存在冲突,从头遍历 else if (!UNSAFE.compareAndSwapObject(this, waitersOffset, q, s)) continue retry; } break; } } } 

返回不同状态的结果信息。

private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); } 
取消任务执行

mayInterruptIfRunning:true代表正在中断;false代表以及取消了。只能对状态NEW的进行取消,不然返回false。

public boolean cancel(boolean mayInterruptIfRunning) { // 1.若state==NEW,会更根据mayInterruptIfRunning,尝试将线程改为INTERRUPTING或CANCELLED, if (!(state == NEW && UNSAFE.compareAndSwapInt(this, stateOffset, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; //执行至此代表原来的状态为NEW以及以及改为INTERRUPTING或CANCELLED try { // in case call to interrupt throws exception // 若是中断,则需要对线程进行中断,并将状态改为INTERRUPTED if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { // 执行收尾方法,唤醒等待线程 finishCompletion(); } return true; } 

参考 https://www.pdai.tech/md/java/thread/java-thread-x-juc-executor-FutureTask.html#futuretask%E7%A4%BA%E4%BE%8B

关注
打赏
1688896170
查看更多评论

暂无认证

  • 6浏览

    0关注

    115984博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文
立即登录/注册

微信扫码登录

0.0590s