Future接口定义了操作异步任务执行一些方法,如获取异步任务的执行结果、取消任务的执行、判断任务是否被取消、判断任务执行是否完毕等。比如我们将任务提交到线程池里面,然后我们会得到一个Futrue,在Future里面有isDone方法来 判断任务是否处理结束,还有get方法可以一直阻塞直到任务结束然后获取结果,(最好放在最后)但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。
Future的主要缺点如下:(1)不支持手动完成 我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成 (2)不支持进一步的非阻塞调用 通过Future的get方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为Future不支持回调函数,所以无法实现这个功能 (3)不支持链式调用 对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在Future中是没法实现的。 (4)不支持多个Future合并 比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。 (5)不支持异常处理 Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位的。
以上这些场景Future无法完成,需要使用到CompletableFuture
CompletableFutureCompletableFuture在Java8里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。
CompletableFuture实现了Future, CompletionStage接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,可以通过回调的方式处理计算结果,也可以提供转换和组合CompletableFuture的方法。
通过这两者集合,从而打造出了强大的CompletableFuture类。
public class CompletableFuture implements Future, CompletionStage {}
CompletableFuture实现了FutureTask相同的功能并解决了FutureTask的痛点问题,提供了更强大的异步编排功能
CompletionStage接口代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段,有些类似Linux系统的管道分隔符传参数。
ps -ef|grep java
ps -ef 先查询一次所有列表
grep java 再查出java的列表
使用CompletableFuture场景:主线程里面创建一个CompletableFuture,然后主线程调用get方法会阻塞,执行完子线程再执行主线程
get()与join()的区别get需要处理异常
join不会抛出异常
都会阻塞 建议使用join,使用join能少抛异常 因为CompletableFuture提供了处理异常的API
get()、 get(long timeout, TimeUnit unit)、getNow(T valueIfAbsent)package com.dongguo.completable;
import java.util.concurrent.*;
/**
* @author Dongguo
* @date 2021/9/5 0005-13:41
* @description:
*/
public class CompletableFutureAPIDemo {
public static void main(String[] args) {
//自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}, threadPoolExecutor);
try {
// System.out.println(future.get());//不见不散
// System.out.println(future.get(1,TimeUnit.SECONDS));//过时不候 超时退出
System.out.println(future.getNow(999));//立即获取结果,如果没有计算完,返回设置的默认值
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
complete(T value) 是否打断get方法立即返回括号值
一般与get()共同存在
打断成功返回默认值
打断失败返回计算的值
package com.dongguo.completable;
import java.util.concurrent.*;
/**
* @author Dongguo
* @date 2021/9/5 0005-13:41
* @description:
*/
public class CompletableFutureAPIDemo {
public static void main(String[] args) {
//自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}, threadPoolExecutor);
try {
System.out.println(future.complete(-1) + "\t" + future.get());
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
运行结果:
true -1
打断成功返回默认值
package com.dongguo.completable;
import java.util.concurrent.*;
/**
* @author Dongguo
* @date 2021/9/5 0005-13:41
* @description:
*/
public class CompletableFutureAPIDemo {
public static void main(String[] args) {
//自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}, threadPoolExecutor);
try {
TimeUnit.SECONDS.sleep(3);
System.out.println(future.complete(-1) + "\t" + future.get());
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
运算结果
false 1
打断失败 返回计算的值
没有返回值的异步任务runAsyncpublic static CompletableFuture runAsync(Runnable runnable)
package com.dongguo.completable;
import java.util.concurrent.CompletableFuture;
/**
* @author Dongguo
* @date 2021/8/24 0024-22:36
* @description: 没有返回值的异步任务
*/
public class CompletableFutureDemo1 {
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
//运行一个没有返回值的异步任务
CompletableFuture future = CompletableFuture.runAsync(() -> {
try {
System.out.println("子线程启动");
Thread.sleep(5000);
System.out.println("子线程完成");
} catch (Exception e) {
e.printStackTrace();
}
});
//主线程阻塞
future.join();
System.out.println("主线程结束");
}
}
运行结果:
主线程开始
子线程启动
子线程完成
主线程结束
返回值类型CompletableFuture 参数是Void 相当于void的包装类
public final
class Void {
使用join
package com.dongguo.completable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/8/24 0024-22:36
* @description: 没有返回值的异步任务
*/
public class CompletableFutureDemo1 {
public static void main(String[] args) {
//自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
try {
System.out.println("主线程开始");
//运行一个没有返回值的异步任务
CompletableFuture future = CompletableFuture.runAsync(() -> {
System.out.println("子线程启动");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程完成");
}, threadPoolExecutor);
future.join();
System.out.println("主线程结束");
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
运行结果:
主线程开始
子线程启动
子线程完成
主线程结束
public static CompletableFuture runAsync(Runnable runnable,Executor executor)
Executor executor参数说明
没有指定Executor的方法,直接使用默认的ForkJoinPool.commonPool() 作为它的线程池执行异步代码。
如果指定线程池,则使用我们自定义的或者特别指定的线程池执行异步代码
注意:自定义线程池使用完后要关闭线程池
package com.dongguo.completable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/8/24 0024-22:36
* @description: 没有返回值的异步任务
*/
public class CompletableFutureDemo1 {
public static void main(String[] args) {
//自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
try {
System.out.println("主线程开始");
//运行一个没有返回值的异步任务
CompletableFuture future = CompletableFuture.runAsync(() -> {
System.out.println("子线程启动");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程完成");
}, threadPoolExecutor);
//主线程阻塞
future.get();
System.out.println("主线程结束");
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
有返回值的异步任务supplyAsync
public static CompletableFuture supplyAsync(Supplier supplier)
package com.dongguo.completable;
import java.util.concurrent.CompletableFuture;
/**
* @author Dongguo
* @date 2021/8/25 0025-7:56
* @description: 有返回值的异步任务
*/
public class CompletableFutureDemo2 {
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("子线程启动");
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
return "子线程完成";
});
//主线程阻塞
String result = future.join();
System.out.println("主线程结束,子线程结果为:" + result);
}
}
运行结果:
主线程开始
子线程启动
主线程结束,子线程结果为:子线程完成
public static CompletableFuture supplyAsync(Supplier supplier,Executor executor)
package com.dongguo.completable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/8/25 0025-7:56
* @description: 有返回值的异步任务
*/
public class CompletableFutureDemo2 {
public static void main(String[] args) throws Exception {
//自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
System.out.println("主线程开始");
//运行一个没有返回值的异步任务
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("子线程启动");
Thread.sleep(5000);
} catch (Exception e) {
e.printStackTrace();
}
return "子线程完成";
},threadPoolExecutor);
//主线程阻塞
String result = future.join();
System.out.println("主线程结束,子线程结果为:" + result);
threadPoolExecutor.shutdown();
}
}
future.get();依然会阻塞主线程
线程依赖thenApply 由于存在依赖关系(当前一步出错,不走下一步),当前步骤有异常的话就叫停。当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。
package com.dongguo.completable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/8/25 0025-8:03
* @description: 先对一个数加10, 然后取平方
*/
public class CompletableFutureDemo3 {
private static Integer num = 10;
public static void main(String[] args) throws Exception {
//自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
System.out.println("主线程开始");
//运行一个没有返回值的异步任务
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
System.out.println("子线程启动 执行加10");
num += 10;
} catch (Exception e) {
e.printStackTrace();
}
return num;
}).thenApply((i) -> {
System.out.println("子线程启动 执行平方");
return i * i;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("result:"+v);
}
}).exceptionally((e) -> {
e.printStackTrace();
return null;
});
future.join();
System.out.println("主线程结束");
threadPoolExecutor.shutdown();
}
}
主线程开始
子线程启动 执行加10
子线程启动 执行平方
子线程启动 执行减100
result:300
300
主线程结束
线程依赖不想串行可以使用异步执行thenApplyAsync
异常情况
package com.dongguo.completable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/8/25 0025-8:03
* @description:
*/
public class CompletableFutureDemo3 {
private static Integer num = 10;
public static void main(String[] args) throws Exception {
//自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
System.out.println("主线程开始");
//运行一个没有返回值的异步任务
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("子线程启动 执行加10");
num += 10;
return num;
}).thenApply((i) -> {
System.out.println("子线程启动 执行平方");
int m = 10 / 0;//人为异常
return i * i;
}).thenApply((i) -> {
System.out.println("子线程启动 执行减100");
return i - 100;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("result:" + v);
}
}).exceptionally((e) -> {
e.printStackTrace();
return null;
});
System.out.println(future.join());
System.out.println("主线程结束");
threadPoolExecutor.shutdown();
}
}
主线程开始
子线程启动 执行加10
子线程启动 执行平方
null
主线程结束
java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:618)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
at java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:631)
at java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
at com.dongguo.completable.CompletableFutureDemo3.main(CompletableFutureDemo3.java:30)
Caused by: java.lang.ArithmeticException: / by zero
at com.dongguo.completable.CompletableFutureDemo3.lambda$main$1(CompletableFutureDemo3.java:32)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
... 4 more
出现异常,不再计算之后的运算
handle有异常也可以往下一步走,根据带的异常参数可以进一步处理package com.dongguo.completable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/8/25 0025-8:03
* @description:
*/
public class CompletableFutureDemo3 {
private static Integer num = 10;
public static void main(String[] args) throws Exception {
//自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
System.out.println("主线程开始");
//运行一个没有返回值的异步任务
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("子线程启动 执行加10");
num += 10;
return num;
}).handle((i, e) -> {
System.out.println("子线程启动 执行平方");
int m = 10 / 0;//人为异常
return i * i;
}).handle((i, e) -> {
System.out.println("子线程启动 执行减100");
return i - 100;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("result:" + v);
}
}).exceptionally((e) -> {
e.printStackTrace();
return null;
});
System.out.println(future.join());
System.out.println("主线程结束");
threadPoolExecutor.shutdown();
}
}
主线程开始
子线程启动 执行加10
子线程启动 执行平方
子线程启动 执行减100
null
主线程结束
java.util.concurrent.CompletionException: java.lang.NullPointerException
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:838)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1609)
at java.util.concurrent.CompletableFuture$AsyncSupply.exec(CompletableFuture.java:1596)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172)
Caused by: java.lang.NullPointerException
at com.dongguo.completable.CompletableFutureDemo3.lambda$main$2(CompletableFutureDemo3.java:36)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)
... 8 more
出现异常,计算仍然执行
成功执行whenComplete返回结果和异常package com.dongguo.completable;
import java.util.concurrent.CompletableFuture;
/**
* @author Dongguo
* @date 2021/8/25 0025-8:12
* @description:
*/
public class CompletableFutureDemo {
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
//运行一个没有返回值的异步任务
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("子线程启动");
num += 10;
return num;
}).thenApply((f)->{
return f+1;
}).whenComplete((v,e)->{
if (e == null){
System.out.println("result= "+v);
}
});
}
}
运行结果:
主线程开始
子线程启动
为什么没有打印出result呢
注意 因为是异步执行,子线程在处理任务,主线程却执行结束了,
这样所有的任务都被迫结束退出了。
解决 要保证主线程执行的时间要超过子线程执行的时间
package com.dongguo.completable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/8/25 0025-8:12
* @description:
*/
public class CompletableFutureDemo {
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
//运行一个没有返回值的异步任务
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("子线程启动");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
num += 10;
return num;
}).thenApply((f) -> {
return f + 1;
}).whenComplete((v, e) -> {
if (e == null) {
System.out.println("result= " + v);
}
});
//主线程不要立刻结束,否则CompletableFuture默认使用的线程池会立刻关闭:暂停3秒钟线程
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
运行结果
主线程开始
子线程启动
result= 21
消费处理结果thenAccept
thenAccept 消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。
package com.dongguo.completable;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
/**
* @author Dongguo
* @date 2021/8/25 0025-8:08
* @description: 消费处理结果thenAccept
*/
public class CompletableFutureDemo4 {
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
CompletableFuture.supplyAsync(() -> {
try {
System.out.println("子线程启动 执行加10");
num += 10;
} catch (Exception e) {
e.printStackTrace();
}
return num;
}).thenApply((i) -> {
return i * i;
}).thenAccept(new Consumer(){
@Override
public void accept(Integer integer) {
System.out.println("子线程全部处理完成,最后调用了accept,结果为:" + integer);
}
});
System.out.println("主线程结束");
}
}
运行结果:
主线程开始
子线程启动 执行加10
子线程全部处理完成,最后调用了accept,结果为:400
主线程结束
异常处理exceptionally、handle
exceptionally异常处理,出现异常时触发
package com.dongguo.completable;
import java.util.concurrent.CompletableFuture;
/**
* @author Dongguo
* @date 2021/8/25 0025-8:12
* @description:
*/
public class CompletableFutureDemo5 {
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("子线程启动");
int i = 1 / 0;
num += 10;
return num;
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return -1;
});
//主线程阻塞
int result = future.join();
System.out.println("主线程结束,子线程结果为:" + result);
}
}
运行结果:
主线程开始
子线程启动
java.lang.ArithmeticException: / by zero
主线程结束,子线程结果为:-1
handle类似于thenAccept/thenRun方法,是最后一步的处理调用,但是同时可以处理异常
package com.dongguo.completable;
import java.util.concurrent.CompletableFuture;
/**
* @author Dongguo
* @date 2021/8/25 0025-8:16
* @description:
*/
public class CompletableFutureDemo6 {
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("子线程启动");
int i = 1 / 0;
num += 10;
return num;
}).handle((i, e) -> {
if (e != null) {
System.out.println("发生了异常,内容为:" + e.getMessage());
return -1;
} else {
System.out.println("正常完成,内容为: " + i);
return i;
}
});
//主线程阻塞
int result = future.join();
System.out.println("主线程结束,子线程结果为:" + result);
}
}
主线程开始
子线程启动
发生了异常,内容为:java.lang.ArithmeticException: / by zero
主线程结束,子线程结果为:-1
结果合并thenCompose、thenCombine
thenCompose合并两个有依赖关系的CompletableFutures的执行结果
package com.dongguo.completable;
import java.util.concurrent.CompletableFuture;
/**
* @author Dongguo
* @date 2021/8/25 0025-8:25
* @description:
*/
public class CompletableFutureDemo7 {
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("子线程启动");
num += 10;
return num;
});
//合并
CompletableFuture future2 = future.thenCompose(i ->
CompletableFuture.supplyAsync(() -> {
return i + 1;
}));
//主线程阻塞
int result = future.join();
int result2 = future2.join();
System.out.println("主线程结束,子线程结果为:" + result);
System.out.println("合并后结果为:"+result2);
}
}
运行结果:
主线程开始
子线程启动
主线程结束,子线程结果为:20
合并后结果为:21
thenCombine合并两个没有依赖关系的CompletableFutures任务
案例1简化版
package com.dongguo.completable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
/**
* @author Dongguo
* @date 2021/8/25 0025-8:30
* @description:
*/
public class CompletableFutureAPIDemo1 {
public static void main(String[] args) throws Exception {
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
return 10;
}).thenCombine(CompletableFuture.supplyAsync(() -> {
return 20;
}), (r1, r2) -> {
return r1 + r2;
});
System.out.println("合并结果为" + future.join());
}
}
运行结果:
合并结果为30
案例2标准版
package com.dongguo.completable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
/**
* @author Dongguo
* @date 2021/8/25 0025-8:30
* @description:
*/
public class CompletableFutureDemo8 {
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
//运行一个没有返回值的异步任务
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
System.out.println("子线程启动执行加10");
num += 10;
return num;
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("子线程启动执行乘10");
num = num * 10;
return num;
});
//合并两个结果
CompletableFuture futureTotal = future.thenCombine(future2, (f1, f2) -> {
List list = new ArrayList();
list.add(f1);
list.add(f2);
return list;
});
System.out.println("子线程启动执行加10结果为" + future.join());
System.out.println("子线程启动执行乘10结果为" + future2.join());
System.out.println("主线程结束,合并后结果为:" + futureTotal.join());
}
}
运行结果:
主线程开始
子线程启动执行加10
子线程启动执行乘10
子线程启动执行加10结果为20
子线程启动执行乘10结果为200
主线程结束,合并后结果为:[20, 200]
合并多个任务的结果allOf与anyOf
allOf: 一系列独立的future任务,等其所有的任务执行完后做一些事情
package com.dongguo.completable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
/**
* @author Dongguo
* @date 2021/8/25 0025-8:39
* @description:
*/
public class CompletableFutureDemo9 {
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
List list = new ArrayList();
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程启动执行加10");
num += 10;
return num;
});
list.add(future);
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程启动执行乘10");
num = num * 10;
return num;
});
list.add(future2);
CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程启动执行减10");
num -= 10;
return num;
});
list.add(future3);
CompletableFuture future4 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(4);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("子线程启动执行除以10");
num = num / 10;
return num;
});
list.add(future4);
//多任务合并
// List collect = list.stream()
// .map(CompletableFuture::join)
// .collect(Collectors.toList());
// System.out.println("主线程结束,合并后结果为:" + collect);
CompletableFuture allOfFuture = CompletableFuture.allOf(future, future2, future3, future4);
/*
* 因为allof没有返回值,所以通过theApply,
* 给allOfFuture附上一个回调函数。在回调函数里面,以此调用么一个Future的Get()函数,
* 获取到4个结果,存入List*/
CompletableFuture completableFuture = allOfFuture.thenApply((v) -> {
return list.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
});
System.out.println("主线程结束,合并后结果为:" + completableFuture.join());
}
}
运行结果:
主线程开始
子线程启动执行加10
子线程启动执行乘10
子线程启动执行减10
子线程启动执行除以10
主线程结束,合并后结果为:[20, 200, 190, 19]
anyOf: 只要在多个future里面有一个返回,整个任务就可以结束,而不需要等到每一个future结束
package com.dongguo.completable;
import java.util.concurrent.CompletableFuture;
/**
* @author Dongguo
* @date 2021/8/25 0025-8:46
* @description:
*/
public class CompletableFutureDemo10 {
private static Integer num = 10;
public static void main(String[] args) throws Exception {
System.out.println("主线程开始");
CompletableFuture[] futures = new CompletableFuture[4];
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(5000);
System.out.println("子线程启动执行加10");
num += 10;
return num;
} catch (InterruptedException e) {
e.printStackTrace();
return 0;
}
});
futures[0] = future;
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
System.out.println("子线程启动执行乘10");
num = num * 10;
return num;
} catch (InterruptedException e) {
e.printStackTrace();
return 0;
}
});
futures[1] = future2;
CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("子线程启动执行减10");
num -= 10;
return num;
} catch (InterruptedException e) {
e.printStackTrace();
return 0;
}
});
futures[2] = future3;
CompletableFuture future4 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(4000);
System.out.println("子线程启动执行除以10");
num = num / 10;
return num;
} catch (InterruptedException e) {
e.printStackTrace();
return 0;
}
});
futures[3] = future4;
//多任务合并 anyOf有一个任务完成就返回 future2先执行完 返回future2执行的结果10*10=100
CompletableFuture futureOne = CompletableFuture.anyOf(futures);
System.out.println("主线程结束,合并后结果为:" + futureOne.join());
}
}
运行结果:
主线程开始
子线程启动执行乘10
主线程结束,合并后结果为:100
applyToEither谁快用谁 最快返回输出的线程结果作为下一次任务的输入
package com.dongguo.completable;
import java.util.concurrent.*;
/**
* @author Dongguo
* @date 2021/9/5 0005-13:41
* @description:
*/
public class CompletableFutureAPIDemo {
public static void main(String[] args) {
//自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
2,
5,
1L,
TimeUnit.SECONDS,
new ArrayBlockingQueue(10));
CompletableFuture future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
},threadPoolExecutor).applyToEither(CompletableFuture.supplyAsync(()->{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 10;
}),(r)->{
return r;
});
try {
System.out.println(future.join());
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPoolExecutor.shutdown();
}
}
}
运行结果
10
总结
1handle+whenComplete 相当于try-finally
exceptionally 相当于try-catch
2不带async的方法表示执行当前任务的线程执行接下的任务
带async的方法表示将接下来的任务交给线程池其他线程来进行执行
3get() join() 建议使用join 少抛异常
4使用默认线程池,要确保主线程运行时间超过子线程
使用自定义线程池,使用后要关闭线程池
5CompletableFuture任务执行三剑客
thenRun(Runnable runnable)
任务 A 执行完执行 B,并且 B 不需要 A 的结果
thenAccept(Consumer action)
任务 A 执行完执行 B,B 需要 A 的结果,但是任务 B 无返回值
thenApply(Function fn)
任务 A 执行完执行 B,B 需要 A 的结果,同时任务 B 有返回值
模拟代码:
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {}).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {}).join());
System.out.println(CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB").join());
案例 电商比价需求
1同一款产品,同时搜索出同款产品在各大电商的售价;
2同·款产品,同时搜索出本产品在某一个电商平台下,各个入驻门店的售价是多少
出来结果希望是同款产品的在不同地方的价格清单列表,返回一个List
《mysql》 in jd price is 88.05
《mysql》 in pdd price is 86.11
《mysql》 in taobao price is 90.43
package com.dongguo.completable;
import lombok.Getter;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* @author Dongguo
* @date 2021/9/5 0005-11:16
* @description: 案例 电商比价需求
*/
public class CompletableFutureNetMallDemo {
//模拟查询商铺列表
static List list = Arrays.asList(
new NetMall("jd"),
new NetMall("pdd"),
new NetMall("tmall")
);
//同步 一个一个查
public static List getPriceByStep(List list, String productName) {
return list.stream()
.map(netMall -> String.format(productName + " in %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName)))
.collect(Collectors.toList());
}
//异步 并发查
public static List getPriceByASync(List list, String productName) {
return list.stream()
.map(netMall -> CompletableFuture.supplyAsync(() -> String.format(productName + " in %s price is %.2f", netMall.getMallName(), netMall.calcPrice(productName))))
.collect(Collectors.toList())
.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
}
public static void main(String[] args) {
long start = System.currentTimeMillis();
//同步
List listPrice1 = getPriceByStep(list, "mysql");
for (String price : listPrice1) {
System.out.println(price);
}
long end = System.currentTimeMillis();
System.out.println("costTime: "+(end - start)+"毫秒");
System.out.println("-------------------------");
long start2 = System.currentTimeMillis();
//异步
List listPrice2 = getPriceByASync(list, "mysql");
for (String price : listPrice2) {
System.out.println(price);
}
long end2 = System.currentTimeMillis();
System.out.println("costTime: "+(end2 - start2)+"毫秒");
}
}
class NetMall {
@Getter
private String mallName;
public NetMall(String mallName) {
this.mallName = mallName;
}
//模拟查询数据库获得商品价格
public double calcPrice(String name) {
//阻塞1秒 模拟查询数据库
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
//生成0-1之间的一个随机数*2 + 查询书籍名称第一个字符 。得到的是ASCII对应的数值
double price = ThreadLocalRandom.current().nextDouble() * 2 + name.charAt(0);
//返回价格
return price;
}
}
运行结果
mysql in jd price is 109.78
mysql in pdd price is 109.85
mysql in tmall price is 109.74
costTime: 3158毫秒
-------------------------
mysql in jd price is 110.35
mysql in pdd price is 109.95
mysql in tmall price is 109.01
costTime: 1012毫秒
异步明显比同步快
而且list越大越明显(当然也受到机器CPU核数影响)
线程池可以根据具体场景进行设置自定义线程池
CompletableFuture使用场景在一个电商平台,平台入驻商家,主页搜索华为手机,查询全部商家商铺卖的手机的价格列表
如果有成千上万个商家提供的商品,需要查询成千上万次。
一次一次的查? ×
使用CompletableFuture异步查询 ? √
CompletableFuture的优点异步任务结束或异常时,会自动回调某个对象的方法;
主线程设置好回调后,不再关心异步任务的执行,异步任务之间可以顺序执行