我们在Jdk8中使用Stream Api 和 CompletableFuture 的时候知道 他们都默认使用了ForkJoinPool线程池. 故此介绍下Fork/Join的框架.如题此框架是在Jdk7中引进的cuiyaonan2000@163.com
简介Java.util.concurrent.ForkJoinPool由Java大师Doug Lea主持编写,它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。
我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行:
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
这就是Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。最终得到一个结果返还给调用者.
API首先我们可以直接获取JDK默认的一个静态线程池,同时也可以获取该线程池的线程数
ForkJoinPool.commonPool();
ForkJoinPool.getCommonPoolParallelism();
通过Api我们可以看到ForkJoinPoo的主要方法如下图所示:
通过如上的内容我们可以看到ForkJoinPool提交任务主要分为如下3类
1)execute()方法 : 没有返回内容2)invoke()方法 : 阻塞到任务返回结果3)Submit()方法 : 异步,先返回一个Future对象,用来在需要的时候获取结果
另:submit返回的是ForkJoinTask,但是它实现了Future.cuiyaonan2000@163.com
稍微,稍微有这么点无奈的就是,ForkJoinPool 不能自动拆分任务.貌似也不可.只能我们自己在创建任务的时候 拆分多个线程然后使用join()方法来等子任务执行完毕了在统一进行收回.cuiyaonan2000@163.com
如此这般我个人感觉ForkJoinPool的主要特点可能就是任务窃取这个东西cuiyaonan2000@163.com
这里使用的例子是继承了递归的如下的两个抽象类:
-
RecursiveAction : 继承了抽象类ForkJoinTask,没有返回值的递归抽象类,(需要自己拆分任务,然后join())
-
RecursiveTask : 继承了抽象类ForkJoinTask, 有返回值的递归抽象类,(需要自己拆分任务,然后join())
package cui.yao.nan.highlevetest.forkjoin;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;
/**
* @Author: cuiyaonan2000@163.com
* @Description: todo
* @Date: Created at 2022-3-3 10:53
*/
public class Test2 {
public static class RecursiveActionImpl extends RecursiveAction {
private List array;
public RecursiveActionImpl(List array) {
this.array = array;
}
@Override
protected void compute() {
if (array.size() > 50) {
List jjList = array.subList(50, array.size());
RecursiveActionImpl jj = new RecursiveActionImpl(jjList);
invokeAll(jj);
jj.join();
}
int resutl = 0;
for (int i = 0; i < 50; i++) {
resutl += array.get(i);
}
System.out.println(Thread.currentThread().getName() + "计算的结果是:" + resutl);
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
List tempList = new ArrayList();
Random random = new Random();
int result = 0;
for (int i = 0; i < 100; i++) {
int tempInt = random.nextInt(2000);
result += tempInt;
tempList.add(tempInt);
}
System.out.println("我是Main方法计算的最正确的结果:"+result);
ForkJoinPool forJoinPool = new ForkJoinPool(3);
forJoinPool.invoke(new RecursiveActionImpl(tempList));
System.out.println("-------------------------------------------");
System.out.println(forJoinPool.submit(new RecursiveTaskImpl(tempList)).get());
}
public static class RecursiveTaskImpl extends RecursiveTask {
private List array;
public RecursiveTaskImpl(List array) {
this.array = array;
}
@Override
protected Integer compute() {
int theResult = 0;
if (array.size() > 50) {
List jjList = array.subList(50, array.size());
RecursiveTaskImpl jj = new RecursiveTaskImpl(jjList);
invokeAll(jj);
theResult = jj.join();
}
int resutl = 0;
for (int i = 0; i < 50; i++) {
resutl += array.get(i);
}
return resutl +theResult;
}
}
}