您当前的位置: 首页 >  Java

wespten

暂无认证

  • 1浏览

    0关注

    899博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

深入学习java源码之Callable.call()与Future.get()

wespten 发布时间:2019-02-03 08:07:50 ,浏览量:1

深入学习java源码之Callable.call()与Future.get()

Callable和Future出现的原因

创建线程的2种方式,一种是直接继承Thread,另外一种就是实现Runnable接口。  这2种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。  如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。

而自从Java 1.5开始,就提供了Callable和Future,通过它们可以在任务执行完毕之后得到任务执行结果。

Callable和Future介绍

Callable接口代表一段可以调用并返回结果的代码;Future接口表示异步任务,是还没有完成的任务给出的未来结果。所以说Callable用于产生结果,Future用于获取结果。

Callable接口使用泛型去定义它的返回类型。Executors类提供了一些有用的方法在线程池中执行Callable内的任务。由于Callable任务是并行的(并行就是整体看上去是并行的,其实在某个时间点只有一个线程在执行),我们必须等待它返回的结果。  java.util.concurrent.Future对象为我们解决了这个问题。在线程池提交Callable任务后返回了一个Future对象,使用它可以知道Callable任务的状态和得到Callable返回的执行结果。Future提供了get()方法让我们可以等待Callable结束并获取它的执行结果。

Callable位于java.util.concurrent包下,它也是一个接口,在它里面也只声明了一个方法,只不过这个方法叫做call():

public interface Callable {
    V call() throws Exception;
}

可以看到,这是一个泛型接口,call()函数返回的类型就是传递进来的V类型。

那么怎么使用Callable呢?

一般情况下是配合ExecutorService来使用的,在ExecutorService接口中声明了若干个submit方法的重载版本:

 Future submit(Callable task);
 Future submit(Runnable task, T result);
Future submit(Runnable task);

第一个submit方法里面的参数类型就是Callable。

暂时只需要知道Callable一般是和ExecutorService配合来使用的,具体的使用方法讲在后面讲述。

一般情况下我们使用第一个submit方法和第三个submit方法,第二个submit方法很少使用。

Future

  Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

在Future接口中声明了5个方法,下面依次解释每个方法的作用:

cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。

isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

isDone方法表示任务是否已经完成,若任务完成,则返回true;

get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;

get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。

也就是说Future提供了三种功能:

  1)判断任务是否完成;

  2)能够中断任务;

  3)能够获取任务执行结果。

  因为Future只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的FutureTask。

public interface RunnableFuture extends Runnable, Future {  
    void run();  
}  

可以看到这个接口实现了Runnable和Future接口,接口中的具体实现由FutureTask来实现。这个类的两个构造方法如下 :

public FutureTask(Callable callable) {  
        if (callable == null)  
            throw new NullPointerException();  
        sync = new Sync(callable);  
    }  
    public FutureTask(Runnable runnable, V result) {  
        sync = new Sync(Executors.callable(runnable, result));  
    }  

如上提供了两个构造函数,一个以Callable为参数,另外一个以Runnable为参数。这些类之间的关联对于任务建模的办法非常灵活,允许你基于FutureTask的Runnable特性(因为它实现了Runnable接口),把任务写成Callable,然后封装进一个由执行者调度并在必要时可以取消的FutureTask。

FutureTask可以由执行者调度,这一点很关键。它对外提供的方法基本上就是Future和Runnable接口的组合:get()、cancel、isDone()、isCancelled()和run(),而run()方法通常都是由执行者调用,我们基本上不需要直接调用它。

 

使用Callable,Future返回结果 Future代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞。FutureTask实现了Future和Runable。Callable代表一个有返回值得操作。

   Callable func = new Callable(){  
        public Integer call() throws Exception {  
            System.out.println("inside callable");  
            Thread.sleep(1000);  
            return new Integer(8);  
        }         
    };        
    FutureTask futureTask  = new FutureTask(func);  
    Thread newThread = new Thread(futureTask);  
    newThread.start();  
      
    try {  
        System.out.println("blocking here");  
        Integer result = futureTask.get();  
        System.out.println(result);  
    } catch (InterruptedException ignored) {  
    } catch (ExecutionException ignored) {  
    } 

 ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。

FutureTask的例子

public class MyCallable implements Callable {  
    private long waitTime;   
    public MyCallable(int timeInMillis){   
        this.waitTime=timeInMillis;  
    }  
    @Override  
    public String call() throws Exception {  
        Thread.sleep(waitTime);  
        //return the thread name executing this callable task  
        return Thread.currentThread().getName();  
    }  

}  
public class FutureTaskExample {  
     public static void main(String[] args) {  
        MyCallable callable1 = new MyCallable(1000);                       // 要执行的任务  
        MyCallable callable2 = new MyCallable(2000);  

        FutureTask futureTask1 = new FutureTask(callable1);// 将Callable写的任务封装到一个由执行者调度的FutureTask对象  
        FutureTask futureTask2 = new FutureTask(callable2);  

        ExecutorService executor = Executors.newFixedThreadPool(2);        // 创建线程池并返回ExecutorService实例  
        executor.execute(futureTask1);  // 执行任务  
        executor.execute(futureTask2);    

        while (true) {  
            try {  
                if(futureTask1.isDone() && futureTask2.isDone()){//  两个任务都完成  
                    System.out.println("Done");  
                    executor.shutdown();                          // 关闭线程池和服务   
                    return;  
                }  

                if(!futureTask1.isDone()){ // 任务1没有完成,会等待,直到任务完成  
                    System.out.println("FutureTask1 output="+futureTask1.get());  
                }  

                System.out.println("Waiting for FutureTask2 to complete");  
                String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);  
                if(s !=null){  
                    System.out.println("FutureTask2 output="+s);  
                }  
            } catch (InterruptedException | ExecutionException e) {  
                e.printStackTrace();  
            }catch(TimeoutException e){  
                //do nothing  
            }  
        }  
    }  
}  

运行如上程序后,可以看到一段时间内没有输出,因为get()方法等待任务执行完成然后才输出内容.

输出结果如下:

FutureTask1 output=pool-1-thread-1
Waiting for FutureTask2 to complete
Waiting for FutureTask2 to complete
Waiting for FutureTask2 to complete
Waiting for FutureTask2 to complete
Waiting for FutureTask2 to complete
FutureTask2 output=pool-1-thread-2
Done

 

例子:并行计算数组的和。

    package executorservice;  
      
    import java.util.ArrayList;  
    import java.util.List;  
    import java.util.concurrent.Callable;  
    import java.util.concurrent.ExecutionException;  
    import java.util.concurrent.ExecutorService;  
    import java.util.concurrent.Executors;  
    import java.util.concurrent.Future;  
    import java.util.concurrent.FutureTask;  
      
    public class ConcurrentCalculator {  
      
        private ExecutorService exec;  
        private int cpuCoreNumber;  
        private List tasks = new ArrayList();  
      
        // 内部类  
        class SumCalculator implements Callable {  
            private int[] numbers;  
            private int start;  
            private int end;  
      
            public SumCalculator(final int[] numbers, int start, int end) {  
                this.numbers = numbers;  
                this.start = start;  
                this.end = end;  
            }  
      
            public Long call() throws Exception {  
                Long sum = 0l;  
                for (int i = start; i < end; i++) {  
                    sum += numbers[i];  
                }  
                return sum;  
            }  
        }  
      
        public ConcurrentCalculator() {  
            cpuCoreNumber = Runtime.getRuntime().availableProcessors();  
            exec = Executors.newFixedThreadPool(cpuCoreNumber);  
        }  
      
        public Long sum(final int[] numbers) {  
            // 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor  
            for (int i = 0; i < cpuCoreNumber; i++) {  
                int increment = numbers.length / cpuCoreNumber + 1;  
                int start = increment * i;  
                int end = increment * i + increment;  
                if (end > numbers.length)  
                    end = numbers.length;  
                SumCalculator subCalc = new SumCalculator(numbers, start, end);  
                FutureTask task = new FutureTask(subCalc);  
                tasks.add(task);  
                if (!exec.isShutdown()) {  
                    exec.submit(task);  
                }  
            }  
            return getResult();  
        }  
      
        /** 
         * 迭代每个只任务,获得部分和,相加返回 
         *  
         * @return 
         */  
        public Long getResult() {  
            Long result = 0l;  
            for (Future task : tasks) {  
                try {  
                    // 如果计算未完成则阻塞  
                    Long subSum = task.get();  
                    result += subSum;  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                } catch (ExecutionException e) {  
                    e.printStackTrace();  
                }  
            }  
            return result;  
        }  
      
        public void close() {  
            exec.shutdown();  
        }  
    }
   int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 };  
    ConcurrentCalculator calc = new ConcurrentCalculator();  
    Long sum = calc.sum(numbers);  
    System.out.println(sum);  
    calc.close();

 

java源码

Modifier and TypeMethod and DescriptionVcall()

计算一个结果,如果不能这样做,就会抛出一个异常。

package java.util.concurrent;

@FunctionalInterface
public interface Callable {
    V call() throws Exception;
}
Modifier and TypeMethod and Descriptionbooleancancel(boolean mayInterruptIfRunning)

尝试取消执行此任务。

Vget()

等待计算完成,然后检索其结果。

Vget(long timeout, TimeUnit unit)

如果需要等待最多在给定的时间计算完成,然后检索其结果(如果可用)。

booleanisCancelled()

如果此任务在正常完成之前被取消,则返回 true

booleanisDone()

返回 true如果任务已完成。

package java.util.concurrent;

public interface Future {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
Modifier and TypeMethod and Descriptionbooleancancel(boolean mayInterruptIfRunning)

尝试取消执行此任务。

protected voiddone()

此任务转换到状态 isDone (无论是正常还是通过取消)调用的受保护方法。

Vget()

等待计算完成,然后检索其结果。

Vget(long timeout, TimeUnit unit)

如果需要等待最多在给定的时间计算完成,然后检索其结果(如果可用)。

booleanisCancelled()

如果此任务在正常完成之前取消,则返回 true

booleanisDone()

返回 true如果任务已完成。

voidrun()

将此未来设置为其计算结果,除非已被取消。

protected booleanrunAndReset()

执行计算而不设置其结果,然后将此将来重置为初始状态,如果计算遇到异常或被取消,则不执行此操作。

protected voidset(V v)

将此未来的结果设置为给定值,除非此未来已被设置或已被取消。

protected voidsetException(Throwable t)

导致这个未来报告一个ExecutionException与给定的可抛弃的原因,除非这个未来已经被设置或被取消。

package java.util.concurrent;
import java.util.concurrent.locks.LockSupport;

public class FutureTask implements RunnableFuture {
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;

    @SuppressWarnings("unchecked")
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

    public FutureTask(Callable callable) {
        if (callable == null)
            throw new NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s  COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos  tk = Thread.class;
            parkBlockerOffset = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("parkBlocker"));
            SEED = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSeed"));
            PROBE = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomProbe"));
            SECONDARY = UNSAFE.objectFieldOffset
                (tk.getDeclaredField("threadLocalRandomSecondarySeed"));
        } catch (Exception ex) { throw new Error(ex); }
    }

}

 

 

关注
打赏
1665965058
查看更多评论
立即登录/注册

微信扫码登录

0.0441s