深入学习java源码之Executors.newFixedThreadPool()与Executors.newCachedThreadPool()
Executor框架是指java5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。他们的关系为:
并发编程的一种编程方式是把任务拆分为一些列的小任务,即Runnable,然后在提交给一个Executor执行,Executor.execute(Runnalbe)。Executor在执行时使用内部的线程池完成操作。
Executor:一个接口,其定义了一个接收Runnable对象的方法executor,其方法签名为executor(Runnable command),
ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法
AbstractExecutorService:ExecutorService执行方法的默认实现
ScheduledExecutorService:一个可定时调度任务的接口
ScheduledThreadPoolExecutor:ScheduledExecutorService的实现,一个可定时调度任务的线程池
ThreadPoolExecutor:线程池,可以通过调用Executors以下静态工厂方法来创建线程池并返回一个ExecutorService对象:
整体关系如下图:
java.util.concurrent.ThreadPoolExecutor 类是线程池中最核心的类之一,因此如果要透彻地了解Java中的线程池,必须先了解这个类。
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//代码省略
}
...
}
ThreadPoolExecutor构造函数的各个参数说明
构造器中各个参数的含义:
corePoolSize:核心池的大小。
核心池中的线程会一致保存在线程池中(即使线程空闲),除非调用allowCoreThreadTimeOut方法允许核心线程在空闲后一定时间内销毁,该时间由构造方法中的keepAliveTime和unit参数指定; 在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这两个方法的名字就可以看出,是**“预创建线程”**的意思,即在没有任务到来之前就创建corePoolSize个线程(prestartAllCoreThreads)或者一个线程(prestartCoreThread);maximumPoolSize:线程池允许的最大线程数。这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程。
默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把新加入的任务放到缓存队列当中,缓存队列由构造方法中的workQueue参数指定,如果入队失败(队列已满)则尝试创建临时线程,但临时线程和核心线程的总数不能超过maximumPoolSize,当线程总数达到maximumPoolSize后会拒绝新任务;所以有两种方式可以让任务绝不被拒绝:
① 将maximumPoolSize设置为Integer.MAX_VALUE(线程数不可能达到这个值),CachedThreadPool就是这么做的;
② 使用无限容量的阻塞队列(比如LinkedBlockingQueue),所有处理不过来的任务全部排队去,FixedThreadPool就是这么做的。
keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。
默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用——当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会被销毁,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(true)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
并发库中所有时间表示方法都是以TimeUnit枚举类作为单位
workQueue:一个阻塞队列(BlockingQueue接口的实现类),用来存储等待执行的任务,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue // 数组实现的阻塞队列,数组不支持自动扩容。所以当阻塞队列已满
// 线程池会根据handler参数中指定的拒绝任务的策略决定如何处理后面加入的任务
LinkedBlockingQueue // 链表实现的阻塞队列,默认容量Integer.MAX_VALUE(不限容),
// 当然也可以通过构造方法限制容量
SynchronousQueue // 零容量的同步阻塞队列,添加任务直到有线程接受该任务才返回
// 用于实现生产者与消费者的同步,所以被叫做同步队列
PriorityBlockingQueue // 二叉堆实现的优先级阻塞队列
DelayQueue // 延时阻塞队列,该队列中的元素需要实现Delayed接口
// 底层使用PriorityQueue的二叉堆对Delayed元素排序
// ScheduledThreadPoolExecutor底层就用了DelayQueue的变体"DelayWorkQueue"
// 队列中所有的任务都会封装成ScheduledFutureTask对象(该类已实现Delayed接口)
threadFactory:线程工厂,主要用来创建线程;默认情况都会使用Executors工具类中定义的默认工厂类DefaultThreadFactory。可以实现ThreadFactory接口来自己控制创建线程池的过程(比如设置创建线程的名字、优先级或者是否为Deamon守护线程)
handler:表示当拒绝处理任务时的策略,有以下四种取值(默认为AbortPolicy):
ThreadPoolExecutor.AbortPolicy: 丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
可通过实现RejectedExecutionHandler接口来自定义任务拒绝后的处理策略
线程池状态转换
其中三个高字节位存储了线程池当前的运行状态,线程池状态有以下几个:
// runState is stored in the high-order bits private static final int RUNNING = -1 action)返回一个
Callable对象,当被调用时,它运行给定的特权动作并返回其结果。static Callablecallable(PrivilegedExceptionAction action)返回一个
Callable对象,该对象在被调用时运行给定的特权异常操作并返回其结果。static Callablecallable(Runnable task)返回一个
Callable对象,当被调用时,它运行给定的任务并返回null。static Callablecallable(Runnable task, T result)返回一个
Callable对象,当被调用时,它运行给定的任务并返回给定的结果。static ThreadFactorydefaultThreadFactory()返回用于创建新线程的默认线程工厂。
static ExecutorServicenewCachedThreadPool()创建一个根据需要创建新线程的线程池,但在可用时将重新使用以前构造的线程。
static ExecutorServicenewCachedThreadPool(ThreadFactory threadFactory)创建一个根据需要创建新线程的线程池,但在可用时将重新使用以前构造的线程,并在需要时使用提供的ThreadFactory创建新线程。
static ExecutorServicenewFixedThreadPool(int nThreads)创建一个线程池,该线程池重用固定数量的从共享无界队列中运行的线程。
static ExecutorServicenewFixedThreadPool(int nThreads, ThreadFactory threadFactory)创建一个线程池,重用固定数量的线程,从共享无界队列中运行,使用提供的ThreadFactory在需要时创建新线程。
static ScheduledExecutorServicenewScheduledThreadPool(int corePoolSize)创建一个线程池,可以调度命令在给定的延迟之后运行,或定期执行。
static ScheduledExecutorServicenewScheduledThreadPool(int corePoolSize, ThreadFactory threadFactory)创建一个线程池,可以调度命令在给定的延迟之后运行,或定期执行。
static ExecutorServicenewSingleThreadExecutor()创建一个使用从无界队列运行的单个工作线程的执行程序。
static ExecutorServicenewSingleThreadExecutor(ThreadFactory threadFactory)创建一个使用单个工作线程运行无界队列的执行程序,并在需要时使用提供的ThreadFactory创建一个新线程。
static ScheduledExecutorServicenewSingleThreadScheduledExecutor()创建一个单线程执行器,可以调度命令在给定的延迟之后运行,或定期执行。
static ScheduledExecutorServicenewSingleThreadScheduledExecutor(ThreadFactory threadFactory)创建一个单线程执行器,可以调度命令在给定的延迟之后运行,或定期执行。
static ExecutorServicenewWorkStealingPool()创建使用所有
available processors作为其目标并行级别的工作窃取线程池。static ExecutorServicenewWorkStealingPool(int parallelism)创建一个维护足够的线程以支持给定的并行级别的线程池,并且可以使用多个队列来减少争用。
static CallableprivilegedCallable(Callable callable)返回一个
Callable对象,当被调用时,将在当前访问控制上下文中执行给定的callable。static CallableprivilegedCallableUsingCurrentClassLoader(Callable callable)返回一个
Callable对象,当被调用时,将在当前访问控制上下文中执行给定的callable,当前上下文类加载器作为上下文类加载器。static ThreadFactoryprivilegedThreadFactory()返回一个用于创建与当前线程具有相同权限的新线程的线程工厂。
static ExecutorServiceunconfigurableExecutorService(ExecutorService executor)返回一个将所有定义的
ExecutorService方法委托给给定执行程序的对象,但不能以其他方式使用转换方式访问。static ScheduledExecutorServiceunconfigurableScheduledExecutorService(ScheduledExecutorService executor)返回一个将所有定义的
ScheduledExecutorService方法委托给给定执行程序的对象,但不能以其他方式使用转换方式访问。package java.util.concurrent; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.security.AccessControlContext; import java.security.AccessController; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.security.PrivilegedActionException; import java.security.AccessControlException; import sun.security.util.SecurityConstants; public class Executors { public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()); } public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())); } public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), threadFactory)); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()); } public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue(), threadFactory); } public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); } public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); } public static ExecutorService unconfigurableExecutorService(ExecutorService executor) { if (executor == null) throw new NullPointerException(); return new DelegatedExecutorService(executor); } public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) { if (executor == null) throw new NullPointerException(); return new DelegatedScheduledExecutorService(executor); } public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } public static ThreadFactory privilegedThreadFactory() { return new PrivilegedThreadFactory(); } public static Callable callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter(task, result); } public static Callable callable(Runnable task) { if (task == null) throw new NullPointerException(); return new RunnableAdapter(task, null); } public static Callable callable(final PrivilegedAction action) { if (action == null) throw new NullPointerException(); return new Callable() { public Object call() { return action.run(); }}; } public static Callable callable(final PrivilegedExceptionAction action) { if (action == null) throw new NullPointerException(); return new Callable() { public Object call() throws Exception { return action.run(); }}; } public static Callable privilegedCallable(Callable callable) { if (callable == null) throw new NullPointerException(); return new PrivilegedCallable(callable); } public static Callable privilegedCallableUsingCurrentClassLoader(Callable callable) { if (callable == null) throw new NullPointerException(); return new PrivilegedCallableUsingCurrentClassLoader(callable); } // Non-public classes supporting the public methods static final class RunnableAdapter implements Callable { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } } static final class PrivilegedCallable implements Callable { private final Callable task; private final AccessControlContext acc; PrivilegedCallable(Callable task) { this.task = task; this.acc = AccessController.getContext(); } public T call() throws Exception { try { return AccessController.doPrivileged( new PrivilegedExceptionAction() { public T run() throws Exception { return task.call(); } }, acc); } catch (PrivilegedActionException e) { throw e.getException(); } } } static final class PrivilegedCallableUsingCurrentClassLoader implements Callable { private final Callable task; private final AccessControlContext acc; private final ClassLoader ccl; PrivilegedCallableUsingCurrentClassLoader(Callable task) { SecurityManager sm = System.getSecurityManager(); if (sm != null) { // Calls to getContextClassLoader from this class // never trigger a security check, but we check // whether our callers have this permission anyways. sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION); // Whether setContextClassLoader turns out to be necessary // or not, we fail fast if permission is not available. sm.checkPermission(new RuntimePermission("setContextClassLoader")); } this.task = task; this.acc = AccessController.getContext(); this.ccl = Thread.currentThread().getContextClassLoader(); } public T call() throws Exception { try { return AccessController.doPrivileged( new PrivilegedExceptionAction() { public T run() throws Exception { Thread t = Thread.currentThread(); ClassLoader cl = t.getContextClassLoader(); if (ccl == cl) { return task.call(); } else { t.setContextClassLoader(ccl); try { return task.call(); } finally { t.setContextClassLoader(cl); } } } }, acc); } catch (PrivilegedActionException e) { throw e.getException(); } } } static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } } static class PrivilegedThreadFactory extends DefaultThreadFactory { private final AccessControlContext acc; private final ClassLoader ccl; PrivilegedThreadFactory() { super(); SecurityManager sm = System.getSecurityManager(); if (sm != null) { // Calls to getContextClassLoader from this class // never trigger a security check, but we check // whether our callers have this permission anyways. sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION); // Fail fast sm.checkPermission(new RuntimePermission("setContextClassLoader")); } this.acc = AccessController.getContext(); this.ccl = Thread.currentThread().getContextClassLoader(); } public Thread newThread(final Runnable r) { return super.newThread(new Runnable() { public void run() { AccessController.doPrivileged(new PrivilegedAction() { public Void run() { Thread.currentThread().setContextClassLoader(ccl); r.run(); return null; } }, acc); } }); } } static class DelegatedExecutorService extends AbstractExecutorService { private final ExecutorService e; DelegatedExecutorService(ExecutorService executor) { e = executor; } public void execute(Runnable command) { e.execute(command); } public void shutdown() { e.shutdown(); } public List shutdownNow() { return e.shutdownNow(); } public boolean isShutdown() { return e.isShutdown(); } public boolean isTerminated() { return e.isTerminated(); } public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { return e.awaitTermination(timeout, unit); } public Future submit(Runnable task) { return e.submit(task); } public Future submit(Callable task) { return e.submit(task); } public Future submit(Runnable task, T result) { return e.submit(task, result); } public List invokeAll(Collection && ((Future)r).isCancelled()) it.remove(); } } catch (ConcurrentModificationException fallThrough) { // Take slow path if we encounter interference during traversal. // Make copy for traversal and call remove for cancelled entries. // The slow path is more likely to be O(N*N). for (Object r : q.toArray()) if (r instanceof Future && ((Future)r).isCancelled()) q.remove(r); } tryTerminate(); // In case SHUTDOWN and now empty } public int getPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Remove rare and surprising possibility of // isTerminated() && getPoolSize() > 0 return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size(); } finally { mainLock.unlock(); } } public int getActiveCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { int n = 0; for (Worker w : workers) if (w.isLocked()) ++n; return n; } finally { mainLock.unlock(); } } public int getLargestPoolSize() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { return largestPoolSize; } finally { mainLock.unlock(); } } public long getTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) { n += w.completedTasks; if (w.isLocked()) ++n; } return n + workQueue.size(); } finally { mainLock.unlock(); } } public long getCompletedTaskCount() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { long n = completedTaskCount; for (Worker w : workers) n += w.completedTasks; return n; } finally { mainLock.unlock(); } } public String toString() { long ncompleted; int nworkers, nactive; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { ncompleted = completedTaskCount; nactive = 0; nworkers = workers.size(); for (Worker w : workers) { ncompleted += w.completedTasks; if (w.isLocked()) ++nactive; } } finally { mainLock.unlock(); } int c = ctl.get(); String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" : (runStateAtLeast(c, TERMINATED) ? "Terminated" : "Shutting down")); return super.toString() + "[" + rs + ", pool size = " + nworkers + ", active threads = " + nactive + ", queued tasks = " + workQueue.size() + ", completed tasks = " + ncompleted + "]"; } protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { } protected void terminated() { } public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } } public static class AbortPolicy implements RejectedExecutionHandler { /** * Creates an {@code AbortPolicy}. */ public AbortPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } } public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } } public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } } }Modifier and TypeMethod and Description
protected RunnableScheduledFuturedecorateTask(Callable callable, RunnableScheduledFuture task)修改或替换用于执行可调用的任务。
protected RunnableScheduledFuturedecorateTask(Runnable runnable, RunnableScheduledFuture task)修改或替换用于执行runnable的任务。
voidexecute(Runnable command)执行
command零要求延迟。booleangetContinueExistingPeriodicTasksAfterShutdownPolicy()获得关于是否继续执行现有定期任务的策略,即使该执行者已经是
shutdown。booleangetExecuteExistingDelayedTasksAfterShutdownPolicy()获得有关是否执行现有延迟任务的政策,即使这个执行者已经是
shutdown。BlockingQueuegetQueue()返回此执行程序使用的任务队列。
booleangetRemoveOnCancelPolicy()获取关于在取消时是否应立即将已取消任务从工作队列中删除的策略。
ScheduledFutureschedule(Callable callable, long delay, TimeUnit unit)创建并执行在给定延迟后启用的ScheduledFuture。
ScheduledFutureschedule(Runnable command, long delay, TimeUnit unit)创建并执行在给定延迟后启用的单次操作。
ScheduledFuturescheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)创建并执行在给定的初始延迟之后,随后以给定的时间段首先启用的周期性动作;那就是执行将在
initialDelay之后开始,然后initialDelay+period,然后是initialDelay + 2 * period等等。ScheduledFuturescheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)创建并执行在给定的初始延迟之后首先启用的定期动作,随后在一个执行的终止和下一个执行的开始之间给定的延迟。
voidsetContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value)设置关于是否继续执行现有周期性任务的策略,即使该执行者已经是
shutdown。voidsetExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value)设置关于是否执行现有延迟任务的策略,即使该执行者已经是
shutdown。voidsetRemoveOnCancelPolicy(boolean value)设置取消时取消任务是否应立即从工作队列中删除的策略。
voidshutdown()启动有序关闭,其中先前提交的任务将被执行,但不会接受任何新任务。
ListshutdownNow()尝试停止所有主动执行的任务,停止等待任务的处理,并返回正在等待执行的任务列表。
Futuresubmit(Callable task)提交值返回任务以执行,并返回代表任务待处理结果的Future。
Futuresubmit(Runnable task)提交一个可运行的任务执行,并返回一个表示该任务的未来。
Futuresubmit(Runnable task, T result)提交一个可运行的任务执行,并返回一个表示该任务的未来。
package java.util.concurrent; import static java.util.concurrent.TimeUnit.NANOSECONDS; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.*; public class ScheduledThreadPoolExecutor extends ThreadPoolExecutor implements ScheduledExecutorService { private volatile boolean continueExistingPeriodicTasksAfterShutdown; private volatile boolean executeExistingDelayedTasksAfterShutdown = true; private volatile boolean removeOnCancel = false; private static final AtomicLong sequencer = new AtomicLong(); final long now() { return System.nanoTime(); } private class ScheduledFutureTask extends FutureTask implements RunnableScheduledFuture { /** Sequence number to break ties FIFO */ private final long sequenceNumber; /** The time the task is enabled to execute in nanoTime units */ private long time; private final long period; /** The actual task to be re-enqueued by reExecutePeriodic */ RunnableScheduledFuture outerTask = this; int heapIndex; ScheduledFutureTask(Runnable r, V result, long ns) { super(r, result); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Runnable r, V result, long ns, long period) { super(r, result); this.time = ns; this.period = period; this.sequenceNumber = sequencer.getAndIncrement(); } ScheduledFutureTask(Callable callable, long ns) { super(callable); this.time = ns; this.period = 0; this.sequenceNumber = sequencer.getAndIncrement(); } public long getDelay(TimeUnit unit) { return unit.convert(time - now(), NANOSECONDS); } public int compareTo(Delayed other) { if (other == this) // compare zero if same object return 0; if (other instanceof ScheduledFutureTask) { ScheduledFutureTask x = (ScheduledFutureTask)other; long diff = time - x.time; if (diff < 0) return -1; else if (diff > 0) return 1; else if (sequenceNumber < x.sequenceNumber) return -1; else return 1; } long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS); return (diff < 0) ? -1 : (diff > 0) ? 1 : 0; } public boolean isPeriodic() { return period != 0; } private void setNextRunTime() { long p = period; if (p > 0) time += p; else time = triggerTime(-p); } public boolean cancel(boolean mayInterruptIfRunning) { boolean cancelled = super.cancel(mayInterruptIfRunning); if (cancelled && removeOnCancel && heapIndex >= 0) remove(this); return cancelled; } public void run() { boolean periodic = isPeriodic(); if (!canRunInCurrentRunState(periodic)) cancel(false); else if (!periodic) ScheduledFutureTask.super.run(); else if (ScheduledFutureTask.super.runAndReset()) { setNextRunTime(); reExecutePeriodic(outerTask); } } } boolean canRunInCurrentRunState(boolean periodic) { return isRunningOrShutdown(periodic ? continueExistingPeriodicTasksAfterShutdown : executeExistingDelayedTasksAfterShutdown); } private void delayedExecute(RunnableScheduledFuture task) { if (isShutdown()) reject(task); else { super.getQueue().add(task); if (isShutdown() && !canRunInCurrentRunState(task.isPeriodic()) && remove(task)) task.cancel(false); else ensurePrestart(); } } void reExecutePeriodic(RunnableScheduledFuture task) { if (canRunInCurrentRunState(true)) { super.getQueue().add(task); if (!canRunInCurrentRunState(true) && remove(task)) task.cancel(false); else ensurePrestart(); } } @Override void onShutdown() { BlockingQueue q = super.getQueue(); boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy(); boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy(); if (!keepDelayed && !keepPeriodic) { for (Object e : q.toArray()) if (e instanceof RunnableScheduledFuture) ((RunnableScheduledFuture) e).cancel(false); q.clear(); } else { // Traverse snapshot to avoid iterator exceptions for (Object e : q.toArray()) { if (e instanceof RunnableScheduledFuture) { RunnableScheduledFuture t = (RunnableScheduledFuture)e; if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) || t.isCancelled()) { // also remove if already cancelled if (q.remove(t)) t.cancel(false); } } } } tryTerminate(); } protected RunnableScheduledFuture decorateTask( Runnable runnable, RunnableScheduledFuture task) { return task; } protected RunnableScheduledFuture decorateTask( Callable callable, RunnableScheduledFuture task) { return task; } public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory); } public ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), handler); } public ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue(), threadFactory, handler); } private long triggerTime(long delay, TimeUnit unit) { return triggerTime(unit.toNanos((delay < 0) ? 0 : delay)); } long triggerTime(long delay) { return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay)); } private long overflowFree(long delay) { Delayed head = (Delayed) super.getQueue().peek(); if (head != null) { long headDelay = head.getDelay(NANOSECONDS); if (headDelay < 0 && (delay - headDelay < 0)) delay = Long.MAX_VALUE + headDelay; } return delay; } public ScheduledFuture schedule(Runnable command, long delay, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture t = decorateTask(command, new ScheduledFutureTask(command, null, triggerTime(delay, unit))); delayedExecute(t); return t; } public ScheduledFuture schedule(Callable callable, long delay, TimeUnit unit) { if (callable == null || unit == null) throw new NullPointerException(); RunnableScheduledFuture t = decorateTask(callable, new ScheduledFutureTask(callable, triggerTime(delay, unit))); delayedExecute(t); return t; } public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { if (command == null || unit == null) throw new NullPointerException(); if (period submit(Runnable task) { return schedule(task, 0, NANOSECONDS); } public Future submit(Runnable task, T result) { return schedule(Executors.callable(task, result), 0, NANOSECONDS); } public Future submit(Callable task) { return schedule(task, 0, NANOSECONDS); } public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) { continueExistingPeriodicTasksAfterShutdown = value; if (!value && isShutdown()) onShutdown(); } public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() { return continueExistingPeriodicTasksAfterShutdown; } public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) { executeExistingDelayedTasksAfterShutdown = value; if (!value && isShutdown()) onShutdown(); } public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() { return executeExistingDelayedTasksAfterShutdown; } public void setRemoveOnCancelPolicy(boolean value) { removeOnCancel = value; } public boolean getRemoveOnCancelPolicy() { return removeOnCancel; } public void shutdown() { super.shutdown(); } public List shutdownNow() { return super.shutdownNow(); } public BlockingQueue getQueue() { return super.getQueue(); } static class DelayedWorkQueue extends AbstractQueue implements BlockingQueue { private static final int INITIAL_CAPACITY = 16; private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY]; private final ReentrantLock lock = new ReentrantLock(); private int size = 0; private Thread leader = null; /** * Condition signalled when a newer task becomes available at the * head of the queue or a new thread may need to become leader. */ private final Condition available = lock.newCondition(); /** * Sets f's heapIndex if it is a ScheduledFutureTask. */ private void setIndex(RunnableScheduledFuture f, int idx) { if (f instanceof ScheduledFutureTask) ((ScheduledFutureTask)f).heapIndex = idx; } private void siftUp(int k, RunnableScheduledFuture key) { while (k > 0) { int parent = (k - 1) >>> 1; RunnableScheduledFuture e = queue[parent]; if (key.compareTo(e) >= 0) break; queue[k] = e; setIndex(e, k); k = parent; } queue[k] = key; setIndex(key, k); } private void siftDown(int k, RunnableScheduledFuture key) { int half = size >>> 1; while (k < half) { int child = (k replacement = queue[s]; queue[s] = null; if (s != i) { siftDown(i, replacement); if (queue[i] == replacement) siftUp(i, replacement); } return true; } finally { lock.unlock(); } } public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return size; } finally { lock.unlock(); } } public boolean isEmpty() { return size() == 0; } public int remainingCapacity() { return Integer.MAX_VALUE; } public RunnableScheduledFuture peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return queue[0]; } finally { lock.unlock(); } } public boolean offer(Runnable x) { if (x == null) throw new NullPointerException(); RunnableScheduledFuture e = (RunnableScheduledFuture)x; final ReentrantLock lock = this.lock; lock.lock(); try { int i = size; if (i >= queue.length) grow(); size = i + 1; if (i == 0) { queue[0] = e; setIndex(e, 0); } else { siftUp(i, e); } if (queue[0] == e) { leader = null; available.signal(); } } finally { lock.unlock(); } return true; } public void put(Runnable e) { offer(e); } public boolean add(Runnable e) { return offer(e); } public boolean offer(Runnable e, long timeout, TimeUnit unit) { return offer(e); } private RunnableScheduledFuture finishPoll(RunnableScheduledFuture f) { int s = --size; RunnableScheduledFuture x = queue[s]; queue[s] = null; if (s != 0) siftDown(0, x); setIndex(f, -1); return f; } public RunnableScheduledFuture poll() { final ReentrantLock lock = this.lock; lock.lock(); try { RunnableScheduledFuture first = queue[0]; if (first == null || first.getDelay(NANOSECONDS) > 0) return null; else return finishPoll(first); } finally { lock.unlock(); } } public RunnableScheduledFuture take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { RunnableScheduledFuture first = queue[0]; if (first == null) available.await(); else { long delay = first.getDelay(NANOSECONDS); if (delay first = queue[0]; if (first == null) { if (nanos first; int n = 0; while ((first = peekExpired()) != null) { c.add(first); // In this order, in case add() throws. finishPoll(first); ++n; } return n; } finally { lock.unlock(); } } public int drainTo(Collection[] array) { this.array = array; } public boolean hasNext() { return cursor < array.length; } public Runnable next() { if (cursor >= array.length) throw new NoSuchElementException(); lastRet = cursor; return array[cursor++]; } public void remove() { if (lastRet < 0) throw new IllegalStateException(); DelayedWorkQueue.this.remove(array[lastRet]); lastRet = -1; } } } }
