您当前的位置: 首页 >  Java

cuiyaonan2000

暂无认证

  • 1浏览

    0关注

    248博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Java: Fork/Join使用

cuiyaonan2000 发布时间:2022-03-02 17:52:13 ,浏览量:1

序言

我们在Jdk8中使用Stream Api 和 CompletableFuture 的时候知道 他们都默认使用了ForkJoinPool线程池. 故此介绍下Fork/Join的框架.如题此框架是在Jdk7中引进的cuiyaonan2000@163.com

简介

Java.util.concurrent.ForkJoinPool由Java大师Doug Lea主持编写,它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出。

我们举个例子:如果要计算一个超大数组的和,最简单的做法是用一个循环在一个线程内完成:

┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘

还有一种方法,可以把数组拆成两部分,分别计算,最后加起来就是最终结果,这样可以用两个线程并行执行:

┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┴─┘

如果拆成两部分还是很大,我们还可以继续拆,用4个线程并行执行:

┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘
┌─┬─┬─┬─┬─┬─┐
└─┴─┴─┴─┴─┴─┘

这就是Fork/Join任务的原理:判断一个任务是否足够小,如果是,直接计算,否则,就分拆成几个小任务分别计算。这个过程可以反复“裂变”成一系列小任务。最终得到一个结果返还给调用者.

API

首先我们可以直接获取JDK默认的一个静态线程池,同时也可以获取该线程池的线程数

ForkJoinPool.commonPool();
ForkJoinPool.getCommonPoolParallelism();

 

通过Api我们可以看到ForkJoinPoo的主要方法如下图所示:

通过如上的内容我们可以看到ForkJoinPool提交任务主要分为如下3类

1)execute()方法 : 没有返回内容2)invoke()方法 :  阻塞到任务返回结果3)Submit()方法 : 异步,先返回一个Future对象,用来在需要的时候获取结果

另:submit返回的是ForkJoinTask,但是它实现了Future.cuiyaonan2000@163.com

用例

稍微,稍微有这么点无奈的就是,ForkJoinPool 不能自动拆分任务.貌似也不可.只能我们自己在创建任务的时候 拆分多个线程然后使用join()方法来等子任务执行完毕了在统一进行收回.cuiyaonan2000@163.com

如此这般我个人感觉ForkJoinPool的主要特点可能就是任务窃取这个东西cuiyaonan2000@163.com

这里使用的例子是继承了递归的如下的两个抽象类:

  1. RecursiveAction : 继承了抽象类ForkJoinTask,没有返回值的递归抽象类,(需要自己拆分任务,然后join())
  2. RecursiveTask : 继承了抽象类ForkJoinTask, 有返回值的递归抽象类,(需要自己拆分任务,然后join())
package cui.yao.nan.highlevetest.forkjoin;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;

/**
 * @Author: cuiyaonan2000@163.com
 * @Description: todo
 * @Date: Created at 2022-3-3  10:53
 */
public class Test2 {

    public static class RecursiveActionImpl extends RecursiveAction {

        private List array;

        public RecursiveActionImpl(List array) {
            this.array = array;
        }

        @Override
        protected void compute() {

            if (array.size() > 50) {
                List jjList = array.subList(50, array.size());
                RecursiveActionImpl jj = new RecursiveActionImpl(jjList);
                invokeAll(jj);
                jj.join();
            }

            int resutl = 0;

            for (int i = 0; i < 50; i++) {
                resutl += array.get(i);
            }

            System.out.println(Thread.currentThread().getName() + "计算的结果是:" + resutl);

        }
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {

        List tempList = new ArrayList();

        Random random = new Random();

        int result = 0;

        for (int i = 0; i < 100; i++) {

            int tempInt = random.nextInt(2000);

            result += tempInt;

            tempList.add(tempInt);

        }

        System.out.println("我是Main方法计算的最正确的结果:"+result);

        ForkJoinPool forJoinPool = new ForkJoinPool(3);

        forJoinPool.invoke(new RecursiveActionImpl(tempList));


        System.out.println("-------------------------------------------");


        System.out.println(forJoinPool.submit(new RecursiveTaskImpl(tempList)).get());

    }

    public static class RecursiveTaskImpl extends RecursiveTask {

        private List array;

        public RecursiveTaskImpl(List array) {
            this.array = array;
        }

        @Override
        protected Integer compute() {

            int theResult = 0;
            if (array.size() > 50) {
                List jjList = array.subList(50, array.size());
                RecursiveTaskImpl jj = new RecursiveTaskImpl(jjList);
                invokeAll(jj);
                theResult = jj.join();
            }

            int resutl = 0;

            for (int i = 0; i < 50; i++) {
                resutl += array.get(i);
            }

            return resutl +theResult;
        }
    }


}

关注
打赏
1638267374
查看更多评论
立即登录/注册

微信扫码登录

0.0367s