您当前的位置: 首页 >  Java

cuiyaonan2000

暂无认证

  • 8浏览

    0关注

    248博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

JAVA:响应式编程

cuiyaonan2000 发布时间:2022-01-12 17:10:45 ,浏览量:8

序言

通常我们编写的Java程序,都是一行执行完了,再执行下一行.上一行没有执行完,下一行就不会执行.这种就是指令式编程.

另外一种就是响应式编程,既可以满足当前一行命令还没有执行完时,下一行命令就可以启动执行.当需要上一行命令的结果时再去获取相关值(或者上一行结果在执行完后主动的告诉我们)cuiyaonan2000@163.com

CompletableFuture

在Java8 之前就有Future提供了响应式编程的实现方式.但是有一些缺陷.所以提供了CompletableFuture,从这个名字上我们就能知道该新特性是比之前的Future更完善和强大的^_^.---其实如果你要自己实现也是可以的,使用多线程的方式Runnable来获取其它线程返回的值cuiyaonan2000@163.com

包目录

CompletableFuture 在 java.util.concurrent这个万众瞩目的目录下 .多少同学的面试必背目录cuiyaonan2000@163.com

构造方法

如下只有一个公有的.还有个私有的构造方法,但是我们没法调用.

示例--这种构造方法的创建对象,实际使用的时候比较少
package cui.yao.nan.completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

/**
 * @Author: cuiyaonan2000@163.com
 * @Description: todo
 * @Date: Created at 2022-1-12  15:29
 */
public class Test {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        //CompletableFuture 的构造方法只有这种
        CompletableFuture completableFuture = new CompletableFuture();

        //此处必须要设置,否则completableFuture.get()就会一直阻塞
        completableFuture.complete("cuiyaonan2000@163.com");

        //get()方法会一直阻塞直到 Future 完成.所以必须有上面这部模拟完成
        String result = completableFuture.get();

        System.out.println(result);


    }
}


异步任务

如上构造方法创建一个子任务的方式其实非常少用.

实际使用CompletableFuture使用的创建异步子任务的方式还是要靠如下的方法cuiyaonan2000@163.com

另:CompletableFuture的方法中 Async 后缀的均为异步操作

runAsync

由上runAsyn其实就是传入一个Runnable的对象,执行异步任务.

那Runnable是有啥子特点呢???就是没有返回值cuiyaonan2000@163.com

runAsync是Runnable任务,不带返回值的,如果入参有executor,则使用executor来执行异步任务

 CompletableFuture runAsync = CompletableFuture.runAsync(()->{
     System.out.println("cuiyaonan2000@163.com begin");
     try {
         Thread.sleep(1000);
     } catch (InterruptedException e) {
         e.printStackTrace();
     }
     System.out.println("cuiyaonan2000@163.com end");
 });


//如下的get很重要,如果没有get,则当主程序运行完了.子线程也会挂掉,即     System.out.println("cuiyaonan2000@163.com end");不会执行
 runAsync.get();

supplyAsync

supplyAsync是带返回值的,如果入参不带executor,则默认使用ForkJoinPool.commonPool()作为执行异步任务的线程池;否则使用executor执行任务。

主要区别就是线程中必须有return,可以参考Callable

  CompletableFuture supplyAsync = CompletableFuture.supplyAsync(()->{
      System.out.println("cuiyaonan2000@163.com begin");
      try {
          Thread.sleep(1000);
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
      System.out.println("cuiyaonan2000@163.com end");
      return "1";
  });

//如下的get很重要,如果没有get,则当主程序运行完了.子线程也会挂掉,即     System.out.println("cuiyaonan2000@163.com end");不会执行
System.out.println(supplyAsync.get());

任务触发器

该类方法主要是当异步任务完成后设置的,所以它是提供给任务的触发器,所包含的方法如下:

complete

直接让异步任务结束,同时可以这只一个返回值.

CompletableFuture whenComplete = CompletableFuture.supplyAsync(()->{
    System.out.println("cuiyaonan2000@163.com begin");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
        System.out.println("cuiyaonan2000@163.com end");
        return "1";
    }).whenComplete((a,e)->{
         System.out.println("异步任务返回的结果是:" +a +"  抛出的异常是:" +e);
         int i =1/0;
    }).exceptionally(e->{
         System.out.println("异常信息是"+e.getCause());
         return "2";
     });
     
whenComplete.complete(11);
     
System.out.println(whenComplete.get());

completeExceptionally

这个更狠 直接让子任务异常退出,注意是异常退出.即使有异常捕获也不能抓到.

        CompletableFuture whenComplete = CompletableFuture.supplyAsync(()->{
            System.out.println("cuiyaonan2000@163.com begin");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cuiyaonan2000@163.com end");
            return "1";
        }).whenComplete((a,e)->{
            System.out.println("异步任务返回的结果是:" +a +"  抛出的异常是:" +e);
            int i =1/0;
        }).exceptionally(e->{
            System.out.println("异常信息是"+e.getCause());
            return "2";
        });
        whenComplete.completeExceptionally(new Exception("121323"));
        System.out.println(whenComplete.get());

whenComplete

顾明思议当任务完成时触发.同时它提供了多钟,主要的有2种.即同步的,还是异步的.如果是同步的则继续使用supplyAsync的线程,如果不是同步的则从线程池中在拿出一个线程执行 

CompletableFuture whenComplete = CompletableFuture.supplyAsync(()->{
    System.out.println("cuiyaonan2000@163.com begin");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("cuiyaonan2000@163.com end");
    return "1";
}).whenComplete((a,e)->{
    System.out.println("异步任务返回的结果是:" +a +"  抛出的异常是:" +e);
});

//如下的get很重要,如果没有get,则当主程序运行完了.子线程也会挂掉,即     System.out.println("cuiyaonan2000@163.com end");不会执行

whenComplete.get();

exceptionally

此即为try cahtch 的finally 接收异常信息,并返回一个结果

CompletableFuture whenComplete = CompletableFuture.supplyAsync(()->{
    System.out.println("cuiyaonan2000@163.com begin");
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
        System.out.println("cuiyaonan2000@163.com end");
        return "1";
   }).whenComplete((a,e)->{
       System.out.println("异步任务返回的结果是:" +a +"  抛出的异常是:" +e);
       int i =1/0;
   }).exceptionally(e->{
       System.out.println("异常信息是"+e.getCause());
       return "2";
   });


whenComplete.get();

handle()

API提供了一个更通用的方法 - handle()从异常恢复,无论一个异常是否发生它都会被调用。

  • 如果异常发生:结果参数将是 null,异常参数有值
  • 如果正常执行:异常参数为null,结果参数有值

 

thenApply

runAsync或者supplyAsync的任务在get()时整个主进程都会阻塞,除非运行结束.所以可以只用thenApply在包一层.

thenApply会获取supplyAsync或者runAsync的返回值,然后在处理后在返回一个.感觉没毛用.因为thenapply也必须要get.

同理区分同步的,还是异步的.如果是同步的则继续使用supplyAsync的线程,如果不是同步的则从线程池中在拿出一个线程执行 

另:thenApply返回的是另外一个CompletableFuture.

        CompletableFuture thenApply = CompletableFuture.supplyAsync(()->{
            System.out.println("cuiyaonan2000@163.com thenApply  begin");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("cuiyaonan2000@163.com thenApply end");
            return "1";
        }).thenApply((a)->{
            return "异步调用返回a"+a;
        }).thenApply((a)->{
            return "异步调用返回a"+a;
        });
        System.out.println(thenApply.get());

thenAccept与 thenRun

thenAccept 与 thenRun 也是返回另外一个CompletableFuture

跟thenApply不同之处在于,不会返回值,只是做一些处理.

// thenAccept() example
CompletableFuture.supplyAsync(() -> {
    return ProductService.getProductDetail(productId);
}).thenAccept(product -> {
    System.out.println("Got product detail from remote service " + product.getName())
});




// thenRun() example
CompletableFuture.supplyAsync(() -> {
    // Run some computation  
}).thenRun(() -> {
    // Computation Finished.
});

thenCompose(类似于thenApply)

区别与thenApply是将方法体内返回内容组装成completableFuture返回,若果方法体中返回的本身就是completablefuture则返回 CompletableFuture 这种结果

thenCompose则去掉CompletableFutre嵌套,只返回CompletableFuture

区别如下:

package cui.yao.nan.completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;

/**
 * @Author: cuiyaonan2000@163.com
 * @Description: todo
 * @Date: Created at 2022-1-13  9:57
 */
public class Test2 {

    public static void main(String[] args){

        CompletableFuture a = CompletableFuture.supplyAsync(()->{
            return 1;
        }).thenApply(result->{
            return 2;
        });

        CompletableFuture b = CompletableFuture.supplyAsync(()->{
            return 1;
        }).thenApply(result->{
            return CompletableFuture.supplyAsync(()->{
                return 3;
            });
        });

        CompletableFuture c = CompletableFuture.supplyAsync(()->{
            return 1;
        }).thenCompose(result->{
            return CompletableFuture.supplyAsync(()->{
                return 3;
            });
        });

        try {
            //这里必须连续使用2次get()才能获取到结果
           System.out.println( b.get().get());

           //这里使用1次get()就能获取返回结果
           System.out.println( c.get());

        } catch (InterruptedException e) {
            
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

    }

}

thenCombine

thenCombine常用于将2个CompletableFuture返回的结果进行组装,即2个

2是该方法的主要特征cuiyaonan2000@163.com

package cui.yao.nan.completablefuture;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;

/**
 * @Author: cuiyaonan2000@163.com
 * @Description: todo
 * @Date: Created at 2022-1-13  9:57
 */
public class Test2 {

    public static void main(String[] args){

        CompletableFuture a = CompletableFuture.supplyAsync(()->{
            return 1;
        }).thenCombine(CompletableFuture.supplyAsync(()->{
            return 2;
        }),(result1,result2)->{
            return result1+ result2;
        });

        try {
            System.out.println(a.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

}

allOf

用于处理收集所有的结果

如下所示虽然可以同时收集多个CompletableFuture,但是返回的是CompletableFuture

所以不能直接处理每个CompletableFuture的返回结果,但是设计上可以灵活变通,利用allOf当数组的内的所有CompletableFuture全都执行完了才会执行thenApply,来遍历原始的数组通过get()获取每个CompletableFuture的值并加以处理----------原理就是等所有的CompletableFuture全都处理完了,统一处理结果值,因为allOf的get必须是,所有的CompletableFuture全都执行完了才能继续执行cuiyaonan2000@163.com

另:allOf仅针对CompletableFuture的当前线程执行完,例如:如果使用了whenCompleteAsync,只能保证supplyAsync执行完,whenCompleteAsync是否执行完allOf 不会管理cuiyaonan2000@163.com

实例:

package cui.yao.nan.completablefuture;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;

/**
 * @Author: cuiyaonan2000@163.com
 * @Description: todo
 * @Date: Created at 2022-1-13  9:57
 */
public class Test2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        List completableFutureList = new ArrayList();

        CompletableFuture a = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("a运行结束");
            return 1;
        });
        completableFutureList.add(a);

        CompletableFuture b = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("b运行结束");
            return 2;
        });
        completableFutureList.add(b);


        CompletableFuture c = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("c运行结束");
            return 3;
        }).whenComplete((r3,e3)->{
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("c的when运行结束");
        });
        completableFutureList.add(c);

        CompletableFuture voidCompletableFuture
                = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]));

        voidCompletableFuture.join();

        System.out.println("结束了");



    }

}

anyOf()

当传入的CompletableFuture中的任意一个执行完了,就返回哪个.

另:allOf仅针对CompletableFuture的当前线程执行完,例如:如果使用了whenCompleteAsync,只能保证supplyAsync执行完,whenCompleteAsync是否执行完allOf 不会管理cuiyaonan2000@163.com

 

package cui.yao.nan.completablefuture;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;

/**
 * @Author: cuiyaonan2000@163.com
 * @Description: todo
 * @Date: Created at 2022-1-13  9:57
 */
public class Test2 {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        List completableFutureList = new ArrayList();

        CompletableFuture a = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("a运行结束");
            return 1;
        });
        completableFutureList.add(a);

        CompletableFuture b = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("b运行结束");
            return 2;
        });
        completableFutureList.add(b);


        CompletableFuture c = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("c运行结束");
            return 3;
        }).whenComplete((r3,e3)->{
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("c的when运行结束");
        });
        completableFutureList.add(c);

        CompletableFuture voidCompletableFuture
                = CompletableFuture.anyOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]));

        System.out.println(voidCompletableFuture.join());

        System.out.println("结束了");



    }

}

关注
打赏
1638267374
查看更多评论
立即登录/注册

微信扫码登录

0.0386s