您当前的位置: 首页 > 

Dongguo丶

暂无认证

  • 2浏览

    0关注

    472博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

CompletableFuture异步回调

Dongguo丶 发布时间:2021-09-19 20:30:03 ,浏览量:2

Future

Future接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。比如我们将任务提交到线程池里面,然后我们会得到一个Futrue,在Future里面有isDone方法来 判断任务是否处理结束,还有get方法可以一直阻塞直到任务结束然后获取结果,(最好放在最后)但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。

Future的主要缺点如下:

(1)不支持手动完成 我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成 (2)不支持进一步的非阻塞调用 通过Future的get方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为Future不支持回调函数,所以无法实现这个功能 (3)不支持链式调用 对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在Future中是没法实现的。 (4)不支持多个Future合并 比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。 (5)不支持异常处理 Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位的。

以上这些场景Future无法完成,需要使用到CompletableFuture

CompletableFuture

CompletableFuture在Java8里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。

CompletableFuture实现了Future, CompletionStage接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,可以通过回调的方式处理计算结果,也可以提供转换和组合CompletableFuture的方法。

通过这两者集合,从而打造出了强大的CompletableFuture类。

image-20210905085524623

public class CompletableFuture implements Future, CompletionStage {}

CompletableFuture实现了FutureTask相同的功能并解决了FutureTask的痛点问题,提供了更强大的异步编排功能

CompletionStage接口

image-20210905085941751

代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。

ps -ef|grep java   

​ ps -ef 先查询一次所有列表

​ grep java 再查出java的列表

使用CompletableFuture

场景:主线程里面创建一个CompletableFuture,然后主线程调用get方法会阻塞,执行完子线程再执行主线程

get()与join()的区别

get需要处理异常

join不会抛出异常

都会阻塞 建议使用join,使用join能少抛异常 因为CompletableFuture提供了处理异常的API

get()、 get(long timeout, TimeUnit unit)、getNow(T valueIfAbsent)
package com.dongguo.completable;

import java.util.concurrent.*;

/**
 * @author Dongguo
 * @date 2021/9/5 0005-13:41
 * @description:
 */
public class CompletableFutureAPIDemo {
    public static void main(String[] args) {
        //自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                5,
                1L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(10));

        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        }, threadPoolExecutor);

        try {
//            System.out.println(future.get());//不见不散
//            System.out.println(future.get(1,TimeUnit.SECONDS));//过时不候  超时退出
            System.out.println(future.getNow(999));//立即获取结果,如果没有计算完,返回设置的默认值
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPoolExecutor.shutdown();
        }
    }
}
complete(T value) 是否打断get方法立即返回括号值

一般与get()共同存在

打断成功返回默认值

打断失败返回计算的值

package com.dongguo.completable;

import java.util.concurrent.*;

/**
 * @author Dongguo
 * @date 2021/9/5 0005-13:41
 * @description:
 */
public class CompletableFutureAPIDemo {
    public static void main(String[] args) {
        //自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                5,
                1L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(10));

        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        }, threadPoolExecutor);

        try {
            System.out.println(future.complete(-1) + "\t" + future.get());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPoolExecutor.shutdown();
        }
    }
}
运行结果:
true	-1

打断成功返回默认值

package com.dongguo.completable;

import java.util.concurrent.*;

/**
 * @author Dongguo
 * @date 2021/9/5 0005-13:41
 * @description:
 */
public class CompletableFutureAPIDemo {
    public static void main(String[] args) {
        //自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                5,
                1L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(10));

        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        }, threadPoolExecutor);

        try {
            TimeUnit.SECONDS.sleep(3);
            System.out.println(future.complete(-1) + "\t" + future.get());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPoolExecutor.shutdown();
        }
    }
}
运算结果
false	1

打断失败 返回计算的值

没有返回值的异步任务runAsync

public static CompletableFuture runAsync(Runnable runnable)

package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;

/**
 * @author Dongguo
 * @date 2021/8/24 0024-22:36
 * @description: 没有返回值的异步任务
 */
public class CompletableFutureDemo1 {
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture future = CompletableFuture.runAsync(() -> {
            try {
                System.out.println("子线程启动");
                Thread.sleep(5000);
                System.out.println("子线程完成");
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        //主线程阻塞
        future.join();
        System.out.println("主线程结束");
    }
}
运行结果:
主线程开始
子线程启动
子线程完成
主线程结束

返回值类型CompletableFuture 参数是Void 相当于void的包装类

public final
class Void {

使用join

package com.dongguo.completable;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/8/24 0024-22:36
 * @description: 没有返回值的异步任务
 */
public class CompletableFutureDemo1 {
    public static void main(String[] args) {
        //自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                5,
                1L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(10));
        try {
            System.out.println("主线程开始");
            //运行一个没有返回值的异步任务
            CompletableFuture future = CompletableFuture.runAsync(() -> {
                System.out.println("子线程启动");
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("子线程完成");
            }, threadPoolExecutor);
            future.join();
            System.out.println("主线程结束");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPoolExecutor.shutdown();
        }
    }
}
运行结果:
主线程开始
子线程启动
子线程完成
主线程结束

public static CompletableFuture runAsync(Runnable runnable,Executor executor)

Executor executor参数说明

没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool() 作为它的线程池执行异步代码。

如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码

注意:自定义线程池使用完后要关闭线程池

package com.dongguo.completable;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/8/24 0024-22:36
 * @description: 没有返回值的异步任务
 */
public class CompletableFutureDemo1 {
    public static void main(String[] args) {
        //自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                5,
                1L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(10));
        try {
            System.out.println("主线程开始");
            //运行一个没有返回值的异步任务
            CompletableFuture future = CompletableFuture.runAsync(() -> {
                System.out.println("子线程启动");
                try {
                    TimeUnit.SECONDS.sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("子线程完成");
            }, threadPoolExecutor);
            //主线程阻塞
            future.get();
            System.out.println("主线程结束");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPoolExecutor.shutdown();
        }
    }
}
有返回值的异步任务supplyAsync

public static CompletableFuture supplyAsync(Supplier supplier)

package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-7:56
 * @description: 有返回值的异步任务
 */
public class CompletableFutureDemo2 {
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("子线程启动");
                Thread.sleep(5000);

            } catch (Exception e) {
                e.printStackTrace();
            }
            return "子线程完成";
        });
        //主线程阻塞
        String result = future.join();
        System.out.println("主线程结束,子线程结果为:" + result);
    }
}
运行结果:
主线程开始
子线程启动
主线程结束,子线程结果为:子线程完成

public static CompletableFuture supplyAsync(Supplier supplier,Executor executor)

package com.dongguo.completable;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-7:56
 * @description: 有返回值的异步任务
 */
public class CompletableFutureDemo2 {
    public static void main(String[] args) throws Exception {
        //自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                5,
                1L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(10));
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("子线程启动");
                Thread.sleep(5000);

            } catch (Exception e) {
                e.printStackTrace();
            }
            return "子线程完成";
        },threadPoolExecutor);
        //主线程阻塞
        String result = future.join();
        System.out.println("主线程结束,子线程结果为:" + result);
        threadPoolExecutor.shutdown();
    }
}

future.get();依然会阻塞主线程

线程依赖thenApply 由于存在依赖关系(当前一步出错,不走下一步),当前步骤有异常的话就叫停。

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。

package com.dongguo.completable;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:03
 * @description: 先对一个数加10, 然后取平方
 */
public class CompletableFutureDemo3 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        //自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                5,
                1L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(10));
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("子线程启动 执行加10");
                num += 10;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return num;
        }).thenApply((i) -> {
            System.out.println("子线程启动 执行平方");
            return i * i;
        }).whenComplete((v, e) -> {
            if (e == null) {
                System.out.println("result:"+v);
            }
        }).exceptionally((e) -> {
            e.printStackTrace();
            return null;
        });
        future.join();
        System.out.println("主线程结束");
        threadPoolExecutor.shutdown();
    }
}
主线程开始
子线程启动 执行加10
子线程启动 执行平方
子线程启动 执行减100
result:300
300
主线程结束

线程依赖不想串行可以使用异步执行thenApplyAsync

异常情况

package com.dongguo.completable;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:03
 * @description:
 */
public class CompletableFutureDemo3 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        //自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                5,
                1L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(10));
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            System.out.println("子线程启动 执行加10");
            num += 10;
            return num;
        }).thenApply((i) -> {
            System.out.println("子线程启动 执行平方");
            int m = 10 / 0;//人为异常
            return i * i;
        }).thenApply((i) -> {
            System.out.println("子线程启动 执行减100");
            return i - 100;
        }).whenComplete((v, e) -> {
            if (e == null) {
                System.out.println("result:" + v);
            }
        }).exceptionally((e) -> {
            e.printStackTrace();
            return null;
        });
        System.out.println(future.join());
        System.out.println("主线程结束");
        threadPoolExecutor.shutdown();
    }
}
主线程开始
子线程启动 执行加10
子线程启动 执行平方
null
主线程结束
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:618)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
	at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:631)
	at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
	at com.dongguo.completable.CompletableFutureDemo3.main(CompletableFutureDemo3.java:30)
Caused by: java.lang.ArithmeticException: / by zero
	at com.dongguo.completable.CompletableFutureDemo3.lambda$main$1(CompletableFutureDemo3.java:32)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
	... 4 more

出现异常,不再计算之后的运算

handle有异常也可以往下一步走,根据带的异常参数可以进一步处理
package com.dongguo.completable;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:03
 * @description:
 */
public class CompletableFutureDemo3 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        //自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                5,
                1L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(10));
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            System.out.println("子线程启动 执行加10");
            num += 10;
            return num;
        }).handle((i, e) -> {
            System.out.println("子线程启动 执行平方");
            int m = 10 / 0;//人为异常
            return i * i;
        }).handle((i, e) -> {
            System.out.println("子线程启动 执行减100");
            return i - 100;
        }).whenComplete((v, e) -> {
            if (e == null) {
                System.out.println("result:" + v);
            }
        }).exceptionally((e) -> {
            e.printStackTrace();
            return null;
        });
        System.out.println(future.join());
        System.out.println("主线程结束");
        threadPoolExecutor.shutdown();
    }
}
主线程开始
子线程启动 执行加10
子线程启动 执行平方
子线程启动 执行减100
null
主线程结束
java.util.concurrent.CompletionException: java.lang.NullPointerException
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:838)
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
	at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: java.lang.NullPointerException
	at com.dongguo.completable.CompletableFutureDemo3.lambda$main$2(CompletableFutureDemo3.java:36)
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
	... 8 more

出现异常,计算仍然执行

成功执行whenComplete返回结果和异常
package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:12
 * @description:
 */
public class CompletableFutureDemo {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {

            System.out.println("子线程启动");
            num += 10;
            return num;
        }).thenApply((f)->{
            return  f+1;
        }).whenComplete((v,e)->{
            if (e == null){
                System.out.println("result= "+v);
            }
        });
    }
}
运行结果:
主线程开始
子线程启动

为什么没有打印出result呢

注意 因为是异步执行,子线程在处理任务,主线程却执行结束了,

这样所有的任务都被迫结束退出了。

解决 要保证主线程执行的时间要超过子线程执行的时间

package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:12
 * @description:
 */
public class CompletableFutureDemo {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {

            System.out.println("子线程启动");
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            num += 10;
            return num;
        }).thenApply((f) -> {
            return f + 1;
        }).whenComplete((v, e) -> {
            if (e == null) {
                System.out.println("result= " + v);
            }
        });
        //主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
运行结果
主线程开始
子线程启动
result= 21
消费处理结果thenAccept

thenAccept 消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。

package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:08
 * @description: 消费处理结果thenAccept
 */
public class CompletableFutureDemo4 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("子线程启动 执行加10");
                num += 10;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return num;
        }).thenApply((i) -> {
            return i * i;
        }).thenAccept(new Consumer(){

            @Override
            public void accept(Integer integer) {
                System.out.println("子线程全部处理完成,最后调用了accept,结果为:" + integer);
            }
        });
  
        System.out.println("主线程结束");
    }
}
运行结果:
主线程开始
子线程启动 执行加10
子线程全部处理完成,最后调用了accept,结果为:400
主线程结束
异常处理exceptionally、handle

exceptionally异常处理,出现异常时触发

package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:12
 * @description:
 */
public class CompletableFutureDemo5 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {

            System.out.println("子线程启动");
            int i = 1 / 0;
            num += 10;
            return num;
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return -1;
        });
        //主线程阻塞
        int result = future.join();
        System.out.println("主线程结束,子线程结果为:" + result);
    }
}
运行结果:
主线程开始
子线程启动
java.lang.ArithmeticException: / by zero
主线程结束,子线程结果为:-1

handle类似于thenAccept/thenRun方法,是最后一步的处理调用,但是同时可以处理异常

package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:16
 * @description:
 */
public class CompletableFutureDemo6 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {

            System.out.println("子线程启动");
            int i = 1 / 0;
            num += 10;
            return num;
        }).handle((i, e) -> {
            if (e != null) {
                System.out.println("发生了异常,内容为:" + e.getMessage());
                return -1;
            } else {
                System.out.println("正常完成,内容为: " + i);
                return i;
            }
        });
        //主线程阻塞
        int result = future.join();
        System.out.println("主线程结束,子线程结果为:" + result);
    }
}
主线程开始
子线程启动
发生了异常,内容为:java.lang.ArithmeticException: / by zero
主线程结束,子线程结果为:-1
结果合并thenCompose、thenCombine

thenCompose合并两个有依赖关系的CompletableFutures的执行结果

package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:25
 * @description:
 */
public class CompletableFutureDemo7 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            System.out.println("子线程启动");
            num += 10;
            return num;
        });
        //合并
        CompletableFuture future2 = future.thenCompose(i ->
                CompletableFuture.supplyAsync(() -> {
                    return i + 1;
                }));
        //主线程阻塞
        int result = future.join();
        int result2 = future2.join();

        System.out.println("主线程结束,子线程结果为:" + result);
        System.out.println("合并后结果为:"+result2);
    }
}
运行结果:
主线程开始
子线程启动
主线程结束,子线程结果为:20
合并后结果为:21

thenCombine合并两个没有依赖关系的CompletableFutures任务

案例1简化版

package com.dongguo.completable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:30
 * @description:
 */
public class CompletableFutureAPIDemo1 {

    public static void main(String[] args) throws Exception {
   
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            return 10;
        }).thenCombine(CompletableFuture.supplyAsync(() -> {
            return 20;
        }), (r1, r2) -> {
            return r1 + r2;
        });
        System.out.println("合并结果为" + future.join());
    }
}
运行结果:
合并结果为30

案例2标准版

package com.dongguo.completable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:30
 * @description:
 */
public class CompletableFutureDemo8 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            System.out.println("子线程启动执行加10");
            num += 10;
            return num;
        });
        CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("子线程启动执行乘10");
            num = num * 10;
            return num;
        });
        //合并两个结果
        CompletableFuture futureTotal = future.thenCombine(future2, (f1, f2) -> {
            List list = new ArrayList();
            list.add(f1);
            list.add(f2);
            return list;
        });
        System.out.println("子线程启动执行加10结果为" + future.join());
        System.out.println("子线程启动执行乘10结果为" + future2.join());
        System.out.println("主线程结束,合并后结果为:" + futureTotal.join());
    }
}
运行结果:
主线程开始
子线程启动执行加10
子线程启动执行乘10
子线程启动执行加10结果为20
子线程启动执行乘10结果为200
主线程结束,合并后结果为:[20, 200]
合并多个任务的结果allOf与anyOf

allOf: 一系列独立的future任务,等其所有的任务执行完后做一些事情

package com.dongguo.completable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:39
 * @description:
 */
public class CompletableFutureDemo9 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        List list = new ArrayList();
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("子线程启动执行加10");
            num += 10;
            return num;
        });
        list.add(future);
        CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("子线程启动执行乘10");
            num = num * 10;
            return num;
        });
        list.add(future2);
        CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("子线程启动执行减10");
            num -= 10;
            return num;
        });
        list.add(future3);
        CompletableFuture future4 = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(4);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("子线程启动执行除以10");
            num = num / 10;
            return num;
        });
        list.add(future4);
        //多任务合并
//        List collect = list.stream()
//                .map(CompletableFuture::join)
//                .collect(Collectors.toList());
//        System.out.println("主线程结束,合并后结果为:" + collect);

        CompletableFuture allOfFuture = CompletableFuture.allOf(future, future2, future3, future4);
        /*
        * 因为allof没有返回值,所以通过theApply,
        * 给allOfFuture附上一个回调函数。在回调函数里面,以此调用么一个Future的Get()函数,
        * 获取到4个结果,存入List*/
        CompletableFuture completableFuture = allOfFuture.thenApply((v) -> {
            return list.stream()
                    .map(CompletableFuture::join)
                    .collect(Collectors.toList());
        });
        System.out.println("主线程结束,合并后结果为:" + completableFuture.join());
    }
}
运行结果:
主线程开始
子线程启动执行加10
子线程启动执行乘10
子线程启动执行减10
子线程启动执行除以10
主线程结束,合并后结果为:[20, 200, 190, 19]

anyOf: 只要在多个future里面有一个返回,整个任务就可以结束,而不需要等到每一个future结束

package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;


/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:46
 * @description:
 */
public class CompletableFutureDemo10 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture[] futures = new CompletableFuture[4];

        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                System.out.println("子线程启动执行加10");
                num += 10;
                return num;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return 0;
            }
        });
        futures[0] = future;
        CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
                System.out.println("子线程启动执行乘10");
                num = num * 10;
                return num;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return 0;
            }
        });
        futures[1] = future2;
        CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("子线程启动执行减10");
                num -= 10;
                return num;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return 0;
            }
        });
        futures[2] = future3;
        CompletableFuture future4 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(4000);
                System.out.println("子线程启动执行除以10");
                num = num / 10;
                return num;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return 0;
            }
        });
        futures[3] = future4;
        //多任务合并 anyOf有一个任务完成就返回  future2先执行完 返回future2执行的结果10*10=100
        CompletableFuture futureOne = CompletableFuture.anyOf(futures);
        System.out.println("主线程结束,合并后结果为:" + futureOne.join());
    }
}
运行结果:
主线程开始
子线程启动执行乘10
主线程结束,合并后结果为:100
applyToEither谁快用谁 最快返回输出的线程结果作为下一次任务的输入
package com.dongguo.completable;

import java.util.concurrent.*;

/**
 * @author Dongguo
 * @date 2021/9/5 0005-13:41
 * @description:
 */
public class CompletableFutureAPIDemo {
    public static void main(String[] args) {
        //自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,
                5,
                1L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(10));

        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        },threadPoolExecutor).applyToEither(CompletableFuture.supplyAsync(()->{
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 10;
        }),(r)->{
            return r;
        });

        try {
            System.out.println(future.join());
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            threadPoolExecutor.shutdown();
        }
    }
}
运行结果
10
总结

1handle+whenComplete 相当于try-finally

exceptionally 相当于try-catch

2不带async的方法表示执行当前任务的线程执行接下的任务

带async的方法表示将接下来的任务交给线程池其他线程来进行执行

3get() join() 建议使用join 少抛异常

4使用默认线程池,要确保主线程运行时间超过子线程

使用自定义线程池,使用后要关闭线程池

5CompletableFuture任务执行三剑客

thenRun(Runnable runnable)

任务 A 执行完执行 B,并且 B 不需要 A 的结果

thenAccept(Consumer action)

任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 无返回值

thenApply(Function fn)

任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值

模拟代码:

System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
 

System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());


System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
案例 电商比价需求

1同一款产品,同时搜索出同款产品在各大电商的售价;

2同·款产品,同时搜索出本产品在某一个电商平台下,各个入驻门店的售价是多少

出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List

《mysql》 in jd price is 88.05

《mysql》 in pdd price is 86.11

《mysql》 in taobao price is 90.43

package com.dongguo.completable;

import lombok.Getter;


import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/**
 * @author Dongguo
 * @date 2021/9/5 0005-11:16
 * @description: 案例 电商比价需求
 */
public class CompletableFutureNetMallDemo {
    //模拟查询商铺列表
    static List list = Arrays.asList(
            new NetMall("jd"),
            new NetMall("pdd"),
            new NetMall("tmall")
    );
    //同步 一个一个查
    public static List getPriceByStep(List list, String productName) {
        return list.stream()
                .map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName)))
                .collect(Collectors.toList());
    }
    //异步 并发查
    public static List getPriceByASync(List list, String productName) {
        return list.stream()
                .map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName))))
                .collect(Collectors.toList())
                .stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList());
    }

    public static void main(String[] args) {
        long start = System.currentTimeMillis();
        //同步
        List listPrice1 = getPriceByStep(list, "mysql");
        for (String price : listPrice1) {
            System.out.println(price);
        }
        long end = System.currentTimeMillis();
        System.out.println("costTime: "+(end - start)+"毫秒");
        System.out.println("-------------------------");
        long start2 = System.currentTimeMillis();
        //异步
        List listPrice2 = getPriceByASync(list, "mysql");
        for (String price : listPrice2) {
            System.out.println(price);
        }
        long end2 = System.currentTimeMillis();
        System.out.println("costTime: "+(end2 - start2)+"毫秒");
    }
}

class NetMall {
    @Getter
    private String mallName;

    public NetMall(String mallName) {
        this.mallName = mallName;
    }

    //模拟查询数据库获得商品价格
    public double calcPrice(String name) {
        //阻塞1秒 模拟查询数据库
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        //生成0-1之间的一个随机数*2 + 查询书籍名称第一个字符    。得到的是ASCII对应的数值
        double price = ThreadLocalRandom.current().nextDouble() * 2 + name.charAt(0);
        //返回价格
        return price;
    }
}
运行结果
mysql in jd price is 109.78
mysql in pdd price is 109.85
mysql in tmall price is 109.74
costTime: 3158毫秒
-------------------------
mysql in jd price is 110.35
mysql in pdd price is 109.95
mysql in tmall price is 109.01
costTime: 1012毫秒

异步明显比同步快

而且list越大越明显(当然也受到机器CPU核数影响)

线程池可以根据具体场景进行设置自定义线程池

CompletableFuture使用场景

在一个电商平台,平台入驻商家,主页搜索华为手机,查询全部商家商铺卖的手机的价格列表

如果有成千上万个商家提供的商品,需要查询成千上万次。

一次一次的查? ×

使用CompletableFuture异步查询 ? √

CompletableFuture的优点

异步任务结束或异常时,会自动回调某个对象的方法;

主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行

关注
打赏
1638062488
查看更多评论
立即登录/注册

微信扫码登录

0.3595s