通常我们编写的Java程序,都是一行执行完了,再执行下一行.上一行没有执行完,下一行就不会执行.这种就是指令式编程.
另外一种就是响应式编程,既可以满足当前一行命令还没有执行完时,下一行命令就可以启动执行.当需要上一行命令的结果时再去获取相关值(或者上一行结果在执行完后主动的告诉我们)cuiyaonan2000@163.com
CompletableFuture在Java8 之前就有Future提供了响应式编程的实现方式.但是有一些缺陷.所以提供了CompletableFuture,从这个名字上我们就能知道该新特性是比之前的Future更完善和强大的^_^.---其实如果你要自己实现也是可以的,使用多线程的方式Runnable来获取其它线程返回的值cuiyaonan2000@163.com
包目录CompletableFuture 在 java.util.concurrent这个万众瞩目的目录下 .多少同学的面试必背目录cuiyaonan2000@163.com
如下只有一个公有的.还有个私有的构造方法,但是我们没法调用.
package cui.yao.nan.completablefuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
/**
* @Author: cuiyaonan2000@163.com
* @Description: todo
* @Date: Created at 2022-1-12 15:29
*/
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//CompletableFuture 的构造方法只有这种
CompletableFuture completableFuture = new CompletableFuture();
//此处必须要设置,否则completableFuture.get()就会一直阻塞
completableFuture.complete("cuiyaonan2000@163.com");
//get()方法会一直阻塞直到 Future 完成.所以必须有上面这部模拟完成
String result = completableFuture.get();
System.out.println(result);
}
}
异步任务
如上构造方法创建一个子任务的方式其实非常少用.
实际使用CompletableFuture使用的创建异步子任务的方式还是要靠如下的方法cuiyaonan2000@163.com
另:CompletableFuture的方法中 Async 后缀的均为异步操作
runAsync
由上runAsyn其实就是传入一个Runnable的对象,执行异步任务.
那Runnable是有啥子特点呢???就是没有返回值cuiyaonan2000@163.com
runAsync是Runnable任务,不带返回值的,如果入参有executor,则使用executor来执行异步任务
CompletableFuture runAsync = CompletableFuture.runAsync(()->{
System.out.println("cuiyaonan2000@163.com begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("cuiyaonan2000@163.com end");
});
//如下的get很重要,如果没有get,则当主程序运行完了.子线程也会挂掉,即 System.out.println("cuiyaonan2000@163.com end");不会执行
runAsync.get();
supplyAsync
supplyAsync是带返回值的,如果入参不带executor,则默认使用ForkJoinPool.commonPool()作为执行异步任务的线程池;否则使用executor执行任务。
主要区别就是线程中必须有return,可以参考Callable
CompletableFuture supplyAsync = CompletableFuture.supplyAsync(()->{
System.out.println("cuiyaonan2000@163.com begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("cuiyaonan2000@163.com end");
return "1";
});
//如下的get很重要,如果没有get,则当主程序运行完了.子线程也会挂掉,即 System.out.println("cuiyaonan2000@163.com end");不会执行
System.out.println(supplyAsync.get());
任务触发器
该类方法主要是当异步任务完成后设置的,所以它是提供给任务的触发器,所包含的方法如下:
直接让异步任务结束,同时可以这只一个返回值.
CompletableFuture whenComplete = CompletableFuture.supplyAsync(()->{
System.out.println("cuiyaonan2000@163.com begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("cuiyaonan2000@163.com end");
return "1";
}).whenComplete((a,e)->{
System.out.println("异步任务返回的结果是:" +a +" 抛出的异常是:" +e);
int i =1/0;
}).exceptionally(e->{
System.out.println("异常信息是"+e.getCause());
return "2";
});
whenComplete.complete(11);
System.out.println(whenComplete.get());
completeExceptionally
这个更狠 直接让子任务异常退出,注意是异常退出.即使有异常捕获也不能抓到.
CompletableFuture whenComplete = CompletableFuture.supplyAsync(()->{
System.out.println("cuiyaonan2000@163.com begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("cuiyaonan2000@163.com end");
return "1";
}).whenComplete((a,e)->{
System.out.println("异步任务返回的结果是:" +a +" 抛出的异常是:" +e);
int i =1/0;
}).exceptionally(e->{
System.out.println("异常信息是"+e.getCause());
return "2";
});
whenComplete.completeExceptionally(new Exception("121323"));
System.out.println(whenComplete.get());
whenComplete
顾明思议当任务完成时触发.同时它提供了多钟,主要的有2种.即同步的,还是异步的.如果是同步的则继续使用supplyAsync的线程,如果不是同步的则从线程池中在拿出一个线程执行
CompletableFuture whenComplete = CompletableFuture.supplyAsync(()->{
System.out.println("cuiyaonan2000@163.com begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("cuiyaonan2000@163.com end");
return "1";
}).whenComplete((a,e)->{
System.out.println("异步任务返回的结果是:" +a +" 抛出的异常是:" +e);
});
//如下的get很重要,如果没有get,则当主程序运行完了.子线程也会挂掉,即 System.out.println("cuiyaonan2000@163.com end");不会执行
whenComplete.get();
exceptionally
此即为try cahtch 的finally 接收异常信息,并返回一个结果
CompletableFuture whenComplete = CompletableFuture.supplyAsync(()->{
System.out.println("cuiyaonan2000@163.com begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("cuiyaonan2000@163.com end");
return "1";
}).whenComplete((a,e)->{
System.out.println("异步任务返回的结果是:" +a +" 抛出的异常是:" +e);
int i =1/0;
}).exceptionally(e->{
System.out.println("异常信息是"+e.getCause());
return "2";
});
whenComplete.get();
handle()
API提供了一个更通用的方法 - handle()
从异常恢复,无论一个异常是否发生它都会被调用。
- 如果异常发生:结果参数将是 null,异常参数有值
- 如果正常执行:异常参数为null,结果参数有值
thenApply
runAsync或者
supplyAsync的任务在get()时整个主进程都会阻塞,除非运行结束.所以可以只用thenApply在包一层.
thenApply会获取supplyAsync或者runAsync的返回值,然后在处理后在返回一个.感觉没毛用.因为thenapply也必须要get.
同理区分同步的,还是异步的.如果是同步的则继续使用supplyAsync的线程,如果不是同步的则从线程池中在拿出一个线程执行
另:thenApply返回的是另外一个CompletableFuture.
CompletableFuture thenApply = CompletableFuture.supplyAsync(()->{
System.out.println("cuiyaonan2000@163.com thenApply begin");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("cuiyaonan2000@163.com thenApply end");
return "1";
}).thenApply((a)->{
return "异步调用返回a"+a;
}).thenApply((a)->{
return "异步调用返回a"+a;
});
System.out.println(thenApply.get());
thenAccept与 thenRun
thenAccept 与 thenRun 也是返回另外一个CompletableFuture
跟thenApply不同之处在于,不会返回值,只是做一些处理.
// thenAccept() example
CompletableFuture.supplyAsync(() -> {
return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
System.out.println("Got product detail from remote service " + product.getName())
});
// thenRun() example
CompletableFuture.supplyAsync(() -> {
// Run some computation
}).thenRun(() -> {
// Computation Finished.
});
thenCompose(类似于thenApply)
区别与thenApply是将方法体内返回内容组装成completableFuture返回,若果方法体中返回的本身就是completablefuture则返回 CompletableFuture 这种结果
thenCompose则去掉CompletableFutre嵌套,只返回CompletableFuture
区别如下:
package cui.yao.nan.completablefuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
/**
* @Author: cuiyaonan2000@163.com
* @Description: todo
* @Date: Created at 2022-1-13 9:57
*/
public class Test2 {
public static void main(String[] args){
CompletableFuture a = CompletableFuture.supplyAsync(()->{
return 1;
}).thenApply(result->{
return 2;
});
CompletableFuture b = CompletableFuture.supplyAsync(()->{
return 1;
}).thenApply(result->{
return CompletableFuture.supplyAsync(()->{
return 3;
});
});
CompletableFuture c = CompletableFuture.supplyAsync(()->{
return 1;
}).thenCompose(result->{
return CompletableFuture.supplyAsync(()->{
return 3;
});
});
try {
//这里必须连续使用2次get()才能获取到结果
System.out.println( b.get().get());
//这里使用1次get()就能获取返回结果
System.out.println( c.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
thenCombine
thenCombine常用于将2个CompletableFuture返回的结果进行组装,即2个
2是该方法的主要特征cuiyaonan2000@163.com
package cui.yao.nan.completablefuture;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
/**
* @Author: cuiyaonan2000@163.com
* @Description: todo
* @Date: Created at 2022-1-13 9:57
*/
public class Test2 {
public static void main(String[] args){
CompletableFuture a = CompletableFuture.supplyAsync(()->{
return 1;
}).thenCombine(CompletableFuture.supplyAsync(()->{
return 2;
}),(result1,result2)->{
return result1+ result2;
});
try {
System.out.println(a.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
allOf
用于处理收集所有的结果
如下所示虽然可以同时收集多个CompletableFuture,但是返回的是CompletableFuture
所以不能直接处理每个CompletableFuture的返回结果,但是设计上可以灵活变通,利用allOf当数组的内的所有CompletableFuture全都执行完了才会执行thenApply,来遍历原始的数组通过get()获取每个CompletableFuture的值并加以处理----------原理就是等所有的CompletableFuture全都处理完了,统一处理结果值,因为allOf的get必须是,所有的CompletableFuture全都执行完了才能继续执行cuiyaonan2000@163.com
另:allOf仅针对CompletableFuture的当前线程执行完,例如:如果使用了whenCompleteAsync,只能保证supplyAsync执行完,whenCompleteAsync是否执行完allOf 不会管理cuiyaonan2000@163.com
实例:
package cui.yao.nan.completablefuture;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
/**
* @Author: cuiyaonan2000@163.com
* @Description: todo
* @Date: Created at 2022-1-13 9:57
*/
public class Test2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
List completableFutureList = new ArrayList();
CompletableFuture a = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("a运行结束");
return 1;
});
completableFutureList.add(a);
CompletableFuture b = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("b运行结束");
return 2;
});
completableFutureList.add(b);
CompletableFuture c = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("c运行结束");
return 3;
}).whenComplete((r3,e3)->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("c的when运行结束");
});
completableFutureList.add(c);
CompletableFuture voidCompletableFuture
= CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]));
voidCompletableFuture.join();
System.out.println("结束了");
}
}
anyOf()
当传入的CompletableFuture中的任意一个执行完了,就返回哪个.
另:allOf仅针对CompletableFuture的当前线程执行完,例如:如果使用了whenCompleteAsync,只能保证supplyAsync执行完,whenCompleteAsync是否执行完allOf 不会管理cuiyaonan2000@163.com
package cui.yao.nan.completablefuture;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
/**
* @Author: cuiyaonan2000@163.com
* @Description: todo
* @Date: Created at 2022-1-13 9:57
*/
public class Test2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
List completableFutureList = new ArrayList();
CompletableFuture a = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("a运行结束");
return 1;
});
completableFutureList.add(a);
CompletableFuture b = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("b运行结束");
return 2;
});
completableFutureList.add(b);
CompletableFuture c = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("c运行结束");
return 3;
}).whenComplete((r3,e3)->{
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("c的when运行结束");
});
completableFutureList.add(c);
CompletableFuture voidCompletableFuture
= CompletableFuture.anyOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]));
System.out.println(voidCompletableFuture.join());
System.out.println("结束了");
}
}