深入学习java源码之ForkJoinTask.doJoin()与ForkJoinPool.execute()
ForkJoinTask:
简单的说,ForkJoinTask将任务fork成足够小的任务,并发解决这些小任务,然后将这些小任务结果join。这种思想充分利用了CPU的多核系统,使得CPU的利用率得到大幅度提升,减少了任务执行时间。
通常我们会利用ForkJoinTask的fork方法来分割任务,利用join方法来合并任务,因此我们首先以这两个方法作为切入
分割子任务-fork
public final ForkJoinTask fork() {
((ForkJoinWorkerThread) Thread.currentThread()).pushTask(this);
return this;
}
该方法很简单,将该任务添加到当前线程维护的队列队首处,返回该任务。这里有三点需要注意的:
fork方法将当前线程强制转换成ForkJoinWorkerThread,通过ForkJoinPool执行ForkJoinTask的线程都是该框架定义的ForkJoinWorkerThread,因此这种转换是正确的。如果不是利用ForkJoinPool线程池执行ForkJoinTask,将Thread强制转换成ForkJoinWorkerThread会抛出ClassCastException异常。 将任务加到队列后,由ForkJoinPool调度执行该任务。一般情况下,该任务会被队列所在的线程执行,当线程池其他线程空闲的时候,该任务可能被其他线程窃取。 任务被添加到队列的队首处,即本线程fork的小任务被添加到队首,这隐藏着另一个意思,队列中越早的任务是个较大的任务,刚添加的任务是较小的任务,每次该线程从队首拿更小的任务执行,更小的任务执行完成之后才能join成更大的任务。其他线程从该队列窃取任务的时候会窃取队列最早的任务,即窃取了较大的任务执行。
合并结果-join
假设我们将当前任务t分割成两个任务t1和t2,为了获取任务t的结果,需要等待任务t1和t2的结果,代码片断通常是这种形式:
t1.fork();
t2.fork();
result = t1.join() + t2.join();
join方法正是阻塞等待当前任务执行结束并返回结果。如果任务非正常结束,join方法可能会抛出异常。join结果的时候,如果线程维护的队列头就是该任务,那么直接执行该任务,否则还有更小的任务需要执行,等待该线程执行完该任务。
和很多其他JUC框架类似,ForkJoinTask也有自己的任务执行状态,先学习下ForkJoinTask的几个状态:
volatile int status;
private static final int NORMAL = -1;
private static final int CANCELLED = -2;
private static final int EXCEPTIONAL = -3;
private static final int SIGNAL = 1;
status是一个volatile变量,表示当前任务的执行状态,它有五个状态,负数表示该任务已经执行完成,非负数表示任务还没有执行完成。其中已经执行完成状态又包括NORMAL、CANCELLED、EXCEPTIONAL三种状态,未完成状态包括初始状态0和SIGNAL。下面详细看下每个状态:
NORMAL:表示任务“正常”完成的状态。 CANCELLED:表示任务“取消”完成的状态。 EXCEPTIONAL:表示任务“异常”完成的状态。注意,以上这三个状态都是“完成”的状态,只是完成的途径不一样。 SIGNAL:有其他任务依赖当前任务,任务结束前,通知其他任务join当前任务的结果。 0:任务初始状态(正在执行状态),不需要等待子任务完成。
join方法调用了doJoin方法,doJoin方法有一个关键代码片断:
//任务正常完成,设置正常完成状态,通知其他需要join该任务的线程
if (completed)
return setCompletion(NORMAL);
当前任务完成后,设置该任务的完成状态为NORMAL,并且notifyAll(唤醒)其他在该任务上等待的线程。其他线程被唤醒后会合并该任务的执行结果。既然有notifyAll,那对应的wait在哪里呢?ForkJoinPool调用了ForkJoinTask的tryAwaitDone方法,等待任务完成。
doJoin方法等待该任务完成,返回完成时的状态(NORMAL、CANCELLED、EXCEPTIONAL)。
如果执行当前任务的线程不是ForkJoinWorkerThread,调用externalAwaitDone方法等待任务执行完成。否则,从当前线程维护的队列取队首的任务,如果队首的任务不是当前的任务或者任务未完成,调用当前线程的joinTask方法将当前任务加入到等待队列并等待该任务执行完成。
exec是一个抽象方法,完成具体任务的代码,由子类实现,该方法返回任务是否正常完成。任务执行过程如果抛出异常,捕获异常并设置异常完成状态。如果任务正常完成,设置正常状态并通知其他需要join该任务的线程,其他需要join该任务的线程通常是一个等待父任务完成的线程,也就是说,此时当前任务其实是个子任务,子任务结束后,父任务就可以尝试合并子任务的执行结果了,看下示例图:
任务执行过程抛出异常时,调用者可以获取该异常,ForkJoinTask并没有直接将异常的任务保存起来,而是保存了异常任务的弱引用,在合适的时候,GC将会回收该异常任务,被回收对象对应的弱引用将会保存在弱引用队列中。
private int setExceptionalCompletion(Throwable ex) {
//System.identityHashCode和Object.hashCode返回的值一样,
//都是根据对象在内存中的地址计算出来的哈希码
int h = System.identityHashCode(this);
//操作异常任务表之前先获取锁
final ReentrantLock lock = exceptionTableLock;
lock.lock();
try {
//删除已经被回收对象对应的弱引用,该方法会遍历exceptionTableRefQueue,并删除exceptionTable中
//对应的弱引用
expungeStaleExceptions();
ExceptionNode[] t = exceptionTable;
//将执行过程抛出异常的任务弱引用保存到exceptionTable,这里其实是将
//exceptionTable当作哈希表使用,i就是保存的位置
int i = h & (t.length - 1);
//遍历哈希表索引i处的链表,如果遍历过程中发现已经存在该任务,跳出循环,否则
//遍历到链表末尾时,创建新的ExceptionNode,并将该节点放到链表的头部
for (ExceptionNode e = t[i]; ; e = e.next) {
if (e == null) {
t[i] = new ExceptionNode(this, ex, t[i]);
break;
}
if (e.get() == this)
break;
}
} finally {
lock.unlock();
}
//设置任务的完成状态为EXCEPTIONAL
return setCompletion(EXCEPTIONAL);
}
get方法等待任务执行完成并返回任务计算结果,看下源码:
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone(0L);
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex);
return getRawResult();
}
如果当前线程是ForkJoinWorkerThread,调用doJoin方法获取结果,该方法前面已经讲过了。如果当前线程不是ForkerJoinWorkerThread,调用externalInterruptibleAwaitDone方法。
任务执行完成返回后,如果任务完成状态是CANCELLED,抛出CancellationException异常。如果任务完成状态是EXCEPTIONAL,将任务执行过程中抛出的异常包装成ExecutionExcepiton重新抛出。
重点看下getThrowableException方法,该方法返回当前任务执行过程中抛出的异常,看下源码:
private Throwable getThrowableException() {
//如果任务状态不是EXCEPTIONAL,返回null
if (status != EXCEPTIONAL)
return null;
int h = System.identityHashCode(this);
ExceptionNode e;
final ReentrantLock lock = exceptionTableLock;
lock.lock();
try {
expungeStaleExceptions();
ExceptionNode[] t = exceptionTable;
e = t[h & (t.length - 1)];
//从哈希表exceptionTable中找到当前任务抛出的异常
while (e != null && e.get() != this)
e = e.next;
} finally {
lock.unlock();
}
Throwable ex;
if (e == null || (ex = e.ex) == null)
return null;
//如果该异常不是由当前线程抛出的,通过该异常的无参构造函数,或者只有一个Throwable参数的构造函数
//新建一个异常并返回
if (e.thrower != Thread.currentThread().getId()) {
Class ec = ex.getClass();
try {
Constructor noArgCtor = null;
Constructor[] cs = ec.getConstructors();
for (int i = 0; i < cs.length; ++i) {
Constructor c = cs[i];
Class[] ps = c.getParameterTypes();
if (ps.length == 0)
noArgCtor = c;
else if (ps.length == 1 && ps[0] == Throwable.class)
return (Throwable)(c.newInstance(ex));
}
if (noArgCtor != null) {
Throwable wx = (Throwable)(noArgCtor.newInstance());
wx.initCause(ex);
return wx;
}
} catch (Exception ignore) {
}
}
return ex;
}
该方法从异常表exceptionTable中取当前任务抛出的异常。如果抛出该异常的不是当前线程,查找该异常类对应的无参构造函数、或者只有一个参数Throwable的构造函数,通过该构造函数和反射,创建一个该异常的实例并返回。
getPool返回执行该任务线程所在的线程池,inForkJoinPool返回该任务是否由FJ线程执行。
public static ForkJoinPool getPool() {
Thread t = Thread.currentThread();
return (t instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread) t).pool : null;
}
public static boolean inForkJoinPool() {
return Thread.currentThread() instanceof ForkJoinWorkerThread;
}
tryUnfork方法尝试将该任务从任务队列中弹出。该任务不再被线程池调度。
public boolean tryUnfork() {
return ((ForkJoinWorkerThread) Thread.currentThread())
.unpushTask(this);
}
getQueuedTaskCount方法返回当前线程已经fork但是没有执行的任务数量。
public static int getQueuedTaskCount() {
//返回任务队列中任务的数量
return ((ForkJoinWorkerThread) Thread.currentThread())
.getQueueSize();
}
总结:
1,可以使用invokeAll(task)方法,主动执行其它的ForkJoinTask,并等待Task完成。(是同步的)
2,还可以使用fork方法,让一个task执行(这个方法是异步的)
3,还可以使用join方法,让一个task执行(这个方法是同步的,它和fork不同点是同步或者异步的区别)
4,可以使用join来取得ForkJoinTask的返回值。由于RecursiveTask类实现了Future接口,所以也可以使用get()取得返回值。 get()和join()有两个主要的区别: join()方法不能被中断。如果你中断调用join()方法的线程,这个方法将抛出InterruptedException异常。 如果任务抛出任何未受检异常,get()方法将返回一个ExecutionException异常,而join()方法将返回一个RuntimeException异常。
5,ForkJoinTask在不显示使用ForkJoinPool.execute/invoke/submit()方法进行执行的情况下,也可以使用自己的fork/invoke方法进行执行。 使用fork/invoke方法执行时,其实原理也是在ForkJoinPool里执行,只不过使用的是一个“在ForkJoinPool内部生成的静态的”ForkJoinPool。
6,ForkJoinTask有两个子类,RecursiveAction和RecursiveTask。他们之间的区别是,RecursiveAction没有返回值,RecursiveTask有返回值。
7,看看ForkjoinTask的Complete方法的使用场景 这个方法好要是用来使一个任务结束。这个方法被用在结束异步任务上,或者为那些能不正常结束的任务,提供一个选择。
8,Task的completeExceptionally方法是怎么回事。 这个方法被用来,在异步的Task中产生一个exception,或者强制结束那些“不会结束”的任务 这个方法是在Task想要“自己结束自己”时,可以被使用。而cancel方法,被设计成被其它TASK调用。 当你在一个任务中抛出一个未检查异常时,它也影响到它的父任务(把它提交到ForkJoinPool类的任务)和父任务的父任务,以此类推。
ForkJoinPool:
为什么使用ForkJoinPool ThreadPoolExecutor中每个任务都是由单个线程独立处理的,如果出现一个非常耗时的大任务(比如大数组排序),就可能出现线程池中只有一个线程在处理这个大任务,而其他线程却空闲着,这会导致CPU负载不均衡:空闲的处理器无法帮助工作繁忙的处理器。
ForkJoinPool就是用来解决这种问题的:将一个大任务拆分成多个小任务后,使用fork可以将小任务分发给其他线程同时处理,使用join可以将多个线程处理的结果进行汇总;这实际上就是分治思想的并行版本。
ForkJoinPool的基本原理
ForkJoinPool 类是Fork/Join 框架的核心,和ThreadPoolExecutor一样它也是ExecutorService接口的实现类。
虽说了ForkJoinPool会把大任务拆分成多个子任务,但是ForkJoinPool并不会为每个子任务创建单独的线程。相反,池中每个线程都有自己的双端队列(Deque)用于存储任务。这个双端队列对于工作窃取算法至关重要。
public class ForkJoinWorkerThread extends Thread {
final ForkJoinPool pool; // 工作线程所在的线程池
final ForkJoinPool.WorkQueue workQueue; // 线程的工作队列(这个双端队列是work-stealing机制的核心)
...
}
ForkJoinPool的两大核心就是分而治之(Divide and conquer)和工作窃取(Work Stealing)算法
工作窃取算法
Work Stealing算法是Fork/Join框架的核心思想:
每个线程都有自己的一个WorkQueue,该工作队列是一个双端队列。 队列支持三个功能push、pop、poll push/pop只能被队列的所有者线程调用,而poll可以被其他线程调用。 划分的子任务调用fork时,都会被push到自己的队列中。 默认情况下,工作线程从自己的双端队列获出任务并执行。 当自己的队列为空时,线程随机从另一个线程的队列末尾调用poll方法窃取任务。
创建ForkJoinPool对象
使用Executors工具类
Java8在Executors工具类中新增了两个工厂方法:
// parallelism定义并行级别
public static ExecutorService newWorkStealingPool(int parallelism);
// 默认并行级别为JVM可用的处理器个数
// Runtime.getRuntime().availableProcessors()
public static ExecutorService newWorkStealingPool();
使用ForkJoinPool内部已经初始化好的commonPool
public static ForkJoinPool commonPool();
// 类静态代码块中会调用makeCommonPool方法初始化一个commonPool
使用构造器创建
public ForkJoinPool() {
this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism),
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE, // 队列工作模式
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
前两个构造器最终都是调用第三个构造器,下面解释一下第四个构造器中各个参数的含义:
parallelism:并行级别,通常默认为JVM可用的处理器个数Runtime.getRuntime().availableProcessors()
factory:用于创建ForkJoinPool中使用的线程。
public static interface ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}
ForkJoinPool管理的线程均是扩展自Thread类的ForkJoinWorkerThread类型(里面包含了一个双端队列)。
handler:用于处理工作线程未处理的异常,默认为null。
asyncMode:用于控制WorkQueue的工作模式
ForkJoinTask 大多数情况下,我们都是直接提交ForkJoinTask对象到ForkJoinPool中。
因为ForkJoinTask有以下三个核心方法:
fork():在任务执行过程中将大任务划分为多个小的子任务,调用子任务的fork()方法可以将任务放到线程池中异步调度。
join():调用子任务的join()方法等待任务返回的结果。这个方法类似于Thread.join(),区别在于前者不受线程中断机制的影响。
如果子任务中有运行时异常,join()会抛出异常,quietlyJoin()方法不会抛出异常也不会返回结果,需要你调用getException()或getRawResult()自己去处理异常和结果。
invoke():在当前线程同步执行该任务。该方法也不受中断机制影响。
如果子任务中有运行时异常,invoke()会抛出异常,quietlyInvoke()方法不会抛出异常也不会返回结果,需要你调用getException()或getRawResult()自己去处理异常和结果。
ForkJoinTask中join(),invoke()都不受中断机制影响,内部调用externalAwaitDone()方法实现
如果是在ForkJoinTask内部调用get()方法,本质上和join()方法一样都是调用externalAwaitDone()。
但如果是在ForkJoinTask外部调用get()方法,这时会受线程中断机制影响,因为内部是通过调用externalInterruptibleAwaitDone()方法实现的。
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone();
...
}
ForkJoinTask内部维护了四个状态:
总结:
1,可以使用ForkJoinPool.execute(异步,不返回结果)/invoke(同步,返回结果)/submit(异步,返回结果)方法,来执行ForkJoinTask。
2,ForkJoinPool有一个方法commonPool(),这个方法返回一个ForkJoinPool内部声明的静态ForkJoinPool实例。 文档上说,这个方法适用于大多数的应用。这个静态实例的初始线程数,为“CPU核数-1 ”(Runtime.getRuntime().availableProcessors() - 1)。 ForkJoinTask自己启动时,使用的就是这个静态实例。
Fork/Join的陷阱与注意事项 使用Fork/Join框架时,需要注意一些陷阱
避免不必要的fork() 划分成两个子任务后,不要同时调用两个子任务的fork()方法。
表面上看上去两个子任务都fork(),然后join()两次似乎更自然。但事实证明,直接调用compute()效率更高。因为直接调用子任务的compute()方法实际上就是在当前的工作线程进行了计算(线程重用),这比“将子任务提交到工作队列,线程又从工作队列中拿任务”快得多。
当一个大任务被划分成两个以上的子任务时,尽可能使用前面说到的三个衍生的invokeAll方法,因为使用它们能避免不必要的fork()。
注意fork()、compute()、join()的顺序
为了两个任务并行,三个方法的调用顺序需要万分注意。
right.fork(); // 计算右边的任务
long leftAns = left.compute(); // 计算左边的任务(同时右边任务也在计算)
long rightAns = right.join(); // 等待右边的结果
return leftAns + rightAns;
如果我们写成:
left.fork(); // 计算完左边的任务
long leftAns = left.join(); // 等待左边的计算结果
long rightAns = right.compute(); // 再计算右边的任务
return leftAns + rightAns;
或者
long rightAns = right.compute(); // 计算完右边的任务
left.fork(); // 再计算左边的任务
long leftAns = left.join(); // 等待左边的计算结果
return leftAns + rightAns;
下面两种实际上都没有并行。
选择合适的子任务粒度 选择划分子任务的粒度(顺序执行的阈值)很重要,因为使用Fork/Join框架并不一定比顺序执行任务的效率高:如果任务太大,则无法提高并行的吞吐量;如果任务太小,子任务的调度开销可能会大于并行计算的性能提升,我们还要考虑创建子任务、fork()子任务、线程调度以及合并子任务处理结果的耗时以及相应的内存消耗。
官方文档给出的粗略经验是:任务应该执行100~10000个基本的计算步骤。决定子任务的粒度的最好办法是实践,通过实际测试结果来确定这个阈值才是“上上策”。
和其他Java代码一样,Fork/Join框架测试时需要“预热”或者说执行几遍才会被JIT(Just-in-time)编译器优化,所以测试性能之前跑几遍程序很重要。
避免重量级任务划分与结果合并
Fork/Join的很多使用场景都用到数组或者List等数据结构,子任务在某个分区中运行,最典型的例子如并行排序和并行查找。拆分子任务以及合并处理结果的时候,应该尽量避免System.arraycopy这样耗时耗空间的操作,从而最小化任务的处理开销。
异常处理 Java的受检异常机制一直饱受诟病,所以在ForkJoinTask的invoke()、join()方法及其衍生方法中都没有像get()方法那样抛出个ExecutionException的受检异常。
所以你可以在ForkJoinTask中看到内部把受检异常转换成了运行时异常。
static void rethrow(Throwable ex) {
if (ex != null)
ForkJoinTask.uncheckedThrow(ex);
}
@SuppressWarnings("unchecked")
static void uncheckedThrow(Throwable t) throws T {
throw (T)t; // rely on vacuous cast
}
关于Java你不知道的10件事中已经指出,JVM实际并不关心这个异常是受检异常还是运行时异常,受检异常这东西完全是给Java编译器用的:用于警告程序员这里有个异常没有处理。
但不可否认的是invoke、join()仍可能会抛出运行时异常,所以ForkJoinTask还提供了两个不提取结果和异常的方法quietlyInvoke()、quietlyJoin(),这两个方法允许你在所有任务完成后对结果和异常进行处理。
使用quitelyInvoke()和quietlyJoin()时可以配合isCompletedAbnormally()和isCompletedNormally()方法使用。
java源码
Modifier and TypeMethod and Descriptionstatic ForkJoinTaskadapt(Callable... tasks) 叉出给定的任务,当每个任务保持isDone时 isDone ,或遇到(未检查)异常,在这种情况下,异常被重新引导。
static voidinvokeAll(ForkJoinTask t1, ForkJoinTask t2) 叉出给定的任务,当每个任务保持isDone时 isDone ,或者遇到(未检查)异常,在这种情况下,异常被重新引导。
booleanisCancelled() 如果此任务在正常完成之前被取消,则返回 true 。
booleanisCompletedAbnormally() 如果此任务抛出异常或被取消,返回 true 。
booleanisCompletedNormally() 如果此任务完成而不抛出异常并且未被取消,则返回 true 。
booleanisDone() 返回 true如果任务已完成。
Vjoin() 当 is done返回计算结果。
protected static ForkJoinTaskpeekNextLocalTask() 返回,但不会取消调度或执行当前线程排队但尚未执行的任务(如果可以立即可用)。
protected static ForkJoinTaskpollNextLocalTask() 如果当前线程正在ForkJoinPool中运行,则不执行当前线程排队的下一个任务但尚未执行的时间并返回。
protected static ForkJoinTaskpollTask() 如果当前线程在ForkJoinPool中运行,则不执行下一个任务,返回当前线程排队的下一个任务,但尚未执行,如果一个可用,或者如果不可用,则由其他线程分派的任务,如果可供使用的话。
voidquietlyComplete() 正常完成此任务而不设置值。
voidquietlyInvoke() 执行此任务并等待其完成(如有必要),而不返回其结果或抛出异常。
voidquietlyJoin() 加入此任务,而不返回其结果或抛出异常。
voidreinitialize() 重置此任务的内部簿记状态,允许随后的 fork 。
shortsetForkJoinTaskTag(short tag) 原子地设置此任务的标签值。
protected abstract voidsetRawResult(V value) 强制给定的值作为结果返回。
booleantryUnfork() 尝试取消执行此任务。
package java.util.concurrent; import java.io.Serializable; import java.util.Collection; import java.util.List; import java.util.RandomAccess; import java.lang.ref.WeakReference; import java.lang.ref.ReferenceQueue; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RunnableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.locks.ReentrantLock; import java.lang.reflect.Constructor; public abstract class ForkJoinTask implements Future, Serializable { volatile int status; // accessed directly by pool and workers static final int DONE_MASK = 0xf0000000; // mask out non-completion bits static final int NORMAL = 0xf0000000; // must be negative static final int CANCELLED = 0xc0000000; // must be < NORMAL static final int EXCEPTIONAL = 0x80000000; // must be < CANCELLED static final int SIGNAL = 0x00010000; // must be >= 1 >> 16) != 0) synchronized (this) { notifyAll(); } return completion; } } } final int doExec() { int s; boolean completed; if ((s = status) >= 0) { try { completed = exec(); } catch (Throwable rex) { return setExceptionalCompletion(rex); } if (completed) s = setCompletion(NORMAL); } return s; } final void internalWait(long timeout) { int s; if ((s = status) >= 0 && // force completer to issue notify U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) try { wait(timeout); } catch (InterruptedException ie) { } else notifyAll(); } } } private int externalAwaitDone() { int s = ((this instanceof CountedCompleter) ? // try helping ForkJoinPool.common.externalHelpComplete( (CountedCompleter)this, 0) : ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0); if (s >= 0 && (s = status) >= 0) { boolean interrupted = false; do { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) { try { wait(0L); } catch (InterruptedException ie) { interrupted = true; } } else notifyAll(); } } } while ((s = status) >= 0); if (interrupted) Thread.currentThread().interrupt(); } return s; } private int externalInterruptibleAwaitDone() throws InterruptedException { int s; if (Thread.interrupted()) throw new InterruptedException(); if ((s = status) >= 0 && (s = ((this instanceof CountedCompleter) ? ForkJoinPool.common.externalHelpComplete( (CountedCompleter)this, 0) : ForkJoinPool.common.tryExternalUnpush(this) ? doExec() : 0)) >= 0) { while ((s = status) >= 0) { if (U.compareAndSwapInt(this, STATUS, s, s | SIGNAL)) { synchronized (this) { if (status >= 0) wait(0L); else notifyAll(); } } } } return s; } private int doJoin() { int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w; return (s = status) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (w = (wt = (ForkJoinWorkerThread)t).workQueue). tryUnpush(this) && (s = doExec()) < 0 ? s : wt.pool.awaitJoin(w, this, 0L) : externalAwaitDone(); } private int doInvoke() { int s; Thread t; ForkJoinWorkerThread wt; return (s = doExec()) < 0 ? s : ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool. awaitJoin(wt.workQueue, this, 0L) : externalAwaitDone(); } // Exception table support private static final ExceptionNode[] exceptionTable; private static final ReentrantLock exceptionTableLock; private static final ReferenceQueue exceptionTableRefQueue; private static final int EXCEPTION_MAP_CAPACITY = 32; static final class ExceptionNode extends WeakReference noArgCtor = null; Constructor[] cs = ec.getConstructors();// public ctors only for (int i = 0; i < cs.length; ++i) { Constructor c = cs[i]; Class[] ps = c.getParameterTypes(); if (ps.length == 0) noArgCtor = c; else if (ps.length == 1 && ps[0] == Throwable.class) { Throwable wx = (Throwable)c.newInstance(ex); return (wx == null) ? ex : wx; } } if (noArgCtor != null) { Throwable wx = (Throwable)(noArgCtor.newInstance()); if (wx != null) { wx.initCause(ex); return wx; } } } catch (Exception ignore) { } } return ex; } private static void expungeStaleExceptions() { for (Object x; (x = exceptionTableRefQueue.poll()) != null;) { if (x instanceof ExceptionNode) { int hashCode = ((ExceptionNode)x).hashCode; ExceptionNode[] t = exceptionTable; int i = hashCode & (t.length - 1); ExceptionNode e = t[i]; ExceptionNode pred = null; while (e != null) { ExceptionNode next = e.next; if (e == x) { if (pred == null) t[i] = next; else pred.next = next; break; } pred = e; e = next; } } } } static final void helpExpungeStaleExceptions() { final ReentrantLock lock = exceptionTableLock; if (lock.tryLock()) { try { expungeStaleExceptions(); } finally { lock.unlock(); } } } static void rethrow(Throwable ex) { if (ex != null) ForkJoinTask.uncheckedThrow(ex); } @SuppressWarnings("unchecked") static void uncheckedThrow(Throwable t) throws T { throw (T)t; // rely on vacuous cast } private void reportException(int s) { if (s == CANCELLED) throw new CancellationException(); if (s == EXCEPTIONAL) rethrow(getThrowableException()); } // public methods public final ForkJoinTask fork() { Thread t; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ((ForkJoinWorkerThread)t).workQueue.push(this); else ForkJoinPool.common.externalPush(this); return this; } public final V join() { int s; if ((s = doJoin() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } public final V invoke() { int s; if ((s = doInvoke() & DONE_MASK) != NORMAL) reportException(s); return getRawResult(); } public static void invokeAll(ForkJoinTask t1, ForkJoinTask t2) { int s1, s2; t2.fork(); if ((s1 = t1.doInvoke() & DONE_MASK) != NORMAL) t1.reportException(s1); if ((s2 = t2.doJoin() & DONE_MASK) != NORMAL) t2.reportException(s2); } public static void invokeAll(ForkJoinTask... tasks) { Throwable ex = null; int last = tasks.length - 1; for (int i = last; i >= 0; --i) { ForkJoinTask t = tasks[i]; if (t == null) { if (ex == null) ex = new NullPointerException(); } else if (i != 0) t.fork(); else if (t.doInvoke() < NORMAL && ex == null) ex = t.getException(); } for (int i = 1; i > Collection invokeAll(Collection tasks) { if (!(tasks instanceof RandomAccess) || !(tasks instanceof List)) { invokeAll(tasks.toArray(new ForkJoinTask[tasks.size()])); return tasks; } @SuppressWarnings("unchecked") List> ts = (List>) tasks; Throwable ex = null; int last = ts.size() - 1; for (int i = last; i >= 0; --i) { ForkJoinTask t = ts.get(i); if (t == null) { if (ex == null) ex = new NullPointerException(); } else if (i != 0) t.fork(); else if (t.doInvoke() < NORMAL && ex == null) ex = t.getException(); } for (int i = 1; i peekNextLocalTask() { Thread t; ForkJoinPool.WorkQueue q; if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) q = ((ForkJoinWorkerThread)t).workQueue; else q = ForkJoinPool.commonSubmitterQueue(); return (q == null) ? null : q.peek(); } protected static ForkJoinTask pollNextLocalTask() { Thread t; return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? ((ForkJoinWorkerThread)t).workQueue.nextLocalTask() : null; } protected static ForkJoinTask pollTask() { Thread t; ForkJoinWorkerThread wt; return ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ? (wt = (ForkJoinWorkerThread)t).pool.nextTaskFor(wt.workQueue) : null; } // tag operations public final short getForkJoinTaskTag() { return (short)status; } public final short setForkJoinTaskTag(short tag) { for (int s;;) { if (U.compareAndSwapInt(this, STATUS, s = status, (s & ~SMASK) | (tag & SMASK))) return (short)s; } } public final boolean compareAndSetForkJoinTaskTag(short e, short tag) { for (int s;;) { if ((short)(s = status) != e) return false; if (U.compareAndSwapInt(this, STATUS, s, (s & ~SMASK) | (tag & SMASK))) return true; } } static final class AdaptedRunnable extends ForkJoinTask implements RunnableFuture { final Runnable runnable; T result; AdaptedRunnable(Runnable runnable, T result) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; this.result = result; // OK to set this even before completion } public final T getRawResult() { return result; } public final void setRawResult(T v) { result = v; } public final boolean exec() { runnable.run(); return true; } public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L; } static final class AdaptedRunnableAction extends ForkJoinTask implements RunnableFuture { final Runnable runnable; AdaptedRunnableAction(Runnable runnable) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { runnable.run(); return true; } public final void run() { invoke(); } private static final long serialVersionUID = 5232453952276885070L; } static final class RunnableExecuteAction extends ForkJoinTask { final Runnable runnable; RunnableExecuteAction(Runnable runnable) { if (runnable == null) throw new NullPointerException(); this.runnable = runnable; } public final Void getRawResult() { return null; } public final void setRawResult(Void v) { } public final boolean exec() { runnable.run(); return true; } void internalPropagateException(Throwable ex) { rethrow(ex); // rethrow outside exec() catches. } private static final long serialVersionUID = 5232453952276885070L; } static final class AdaptedCallable extends ForkJoinTask implements RunnableFuture { final Callable adapt(Runnable runnable) { return new AdaptedRunnableAction(runnable); } public static ForkJoinTask adapt(Runnable runnable, T result) { return new AdaptedRunnable(runnable, result); } public static ForkJoinTask adapt(Callable> c)从调度队列中删除所有可用的未执行的提交和分派任务,并将其添加到给定集合中,而不会更改其执行状态。
voidexecute(ForkJoinTask task)为异步执行给定任务的排列。
voidexecute(Runnable task)在将来的某个时间执行给定的命令。
intgetActiveThreadCount()返回当前正在窃取或执行任务的线程数的估计。
booleangetAsyncMode()返回
true如果此池使用本地先入先出调度模式,用于从未加入的分叉任务。static intgetCommonPoolParallelism()返回公共池的目标并行度级别。
ForkJoinPool.ForkJoinWorkerThreadFactorygetFactory()返回用于构建新工人的工厂。
intgetParallelism()返回此池的目标并行度级别。
intgetPoolSize()返回已启动但尚未终止的工作线程数。
intgetQueuedSubmissionCount()返回提交给此池尚未开始执行的任务数量的估计。
longgetQueuedTaskCount()返回由工作线程(但不包括提交到池中尚未开始执行的任务)当前在队列中保留的任务总数的估计值。
intgetRunningThreadCount()返回等待加入任务或其他受管同步的未阻止的工作线程数的估计。
longgetStealCount()返回从另一个线程的工作队列中偷取的任务总数的估计值。
Thread.UncaughtExceptionHandlergetUncaughtExceptionHandler()返回由于在执行任务时遇到不可恢复的错误而终止的内部工作线程的处理程序。
booleanhasQueuedSubmissions()返回
true如果有提交给该池尚未开始执行任何任务。Tinvoke(ForkJoinTask task)执行给定的任务,在完成后返回其结果。
ListinvokeAll(Collection task) { ForkJoinTask[] a; ForkJoinPool p; int b = base, s = top, n; if ((a = array) != null) { // ignore if queue removed int m = a.length - 1; // fenced write for task visibility U.putOrderedObject(a, ((m & s) [] oldA = array; int size = oldA != null ? oldA.length MAXIMUM_QUEUE_CAPACITY) throw new RejectedExecutionException("Queue capacity exceeded"); int oldMask, t, b; ForkJoinTask[] a = array = new ForkJoinTask[size]; if (oldA != null && (oldMask = oldA.length - 1) >= 0 && (t = top) - (b = base) > 0) { int mask = size - 1; do { // emulate poll from old array, push to new array ForkJoinTask x; int oldj = ((b & oldMask) )U.getObjectVolatile(a, j); if (base == b) { if (t != null) { if (U.compareAndSwapObject(a, j, t, null)) { base = b + 1; return t; } } else if (b + 1 == top) // now empty break; } } return null; } final ForkJoinTask nextLocalTask() { return (config & FIFO_QUEUE) == 0 ? pop() : poll(); } final ForkJoinTask peek() { ForkJoinTask[] a = array; int m; if (a == null || (m = a.length - 1) < 0) return null; int i = (config & FIFO_QUEUE) == 0 ? top - 1 : base; int j = ((i & m) t) { ForkJoinTask[] a; int s; if ((a = array) != null && (s = top) != base && U.compareAndSwapObject (a, (((a.length - 1) & --s) [] a = array; if (b - (s = top - 1) = 0) { if ((config & FIFO_QUEUE) == 0) { for (ForkJoinTask t;;) { if ((t = (ForkJoinTask)U.getAndSetObject (a, ((m & s) 0) break; } } else pollAndExecAll(); } } final void runTask(ForkJoinTask task) { if (task != null) { scanState &= ~SCANNING; // mark as busy (currentSteal = task).doExec(); U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC execLocalTasks(); ForkJoinWorkerThread thread = owner; if (++nsteals < 0) // collect on overflow transferStealCount(pool); scanState |= SCANNING; if (thread != null) thread.afterTopLevelExec(); } } final void transferStealCount(ForkJoinPool p) { AtomicLong sc; if (p != null && (sc = p.stealCounter) != null) { int s = nsteals; nsteals = 0; // if negative, correct for overflow sc.getAndAdd((long)(s < 0 ? Integer.MAX_VALUE : s)); } } final boolean tryRemoveAndExec(ForkJoinTask task) { ForkJoinTask[] a; int m, s, b, n; if ((a = array) != null && (m = a.length - 1) >= 0 && task != null) { while ((n = (s = top) - (b = base)) > 0) { for (ForkJoinTask t;;) { // traverse from s to b long j = ((--s & m)关注打赏
