您当前的位置: 首页 > 

Dongguo丶

暂无认证

  • 2浏览

    0关注

    472博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

CompletableFuture异步回调

Dongguo丶 发布时间:2021-08-25 09:03:03 ,浏览量:2

CompletableFuture

CompletableFuture在Java里面被用于异步编程,异步通常意味着非阻塞,可以使得我们的任务单独运行在与主线程分离的其他线程中,并且通过回调可以在主线程中得到异步任务的执行状态,是否完成,和是否异常等信息。 CompletableFuture实现了Future, CompletionStage接口,实现了Future接口就可以兼容现在有线程池框架,而CompletionStage接口才是异步编程的接口抽象,里面定义多种异步方法,通过这两者集合,从而打造出了强大的CompletableFuture类。

Future与CompletableFuture

Futrue在Java里面,通常用来表示一个异步任务的引用,比如我们将任务提交到线程池里面,然后我们会得到一个Futrue,在Future里面有isDone方法来 判断任务是否处理结束,还有get方法可以一直阻塞直到任务结束然后获取结果,但整体来说这种方式,还是同步的,因为需要客户端不断阻塞等待或者不断轮询才能知道任务是否完成。 Future的主要缺点如下: (1)不支持手动完成 我提交了一个任务,但是执行太慢了,我通过其他路径已经获取到了任务结果,现在没法把这个任务结果通知到正在执行的线程,所以必须主动取消或者一直等待它执行完成 (2)不支持进一步的非阻塞调用 通过Future的get方法会一直阻塞到任务完成,但是想在获取任务之后执行额外的任务,因为Future不支持回调函数,所以无法实现这个功能 (3)不支持链式调用 对于Future的执行结果,我们想继续传到下一个Future处理使用,从而形成一个链式的pipline调用,这在Future中是没法实现的。 (4)不支持多个Future合并 比如我们有10个Future并行执行,我们想在所有的Future运行完毕之后,执行某些函数,是没法通过Future实现的。 (5)不支持异常处理 Future的API没有任何的异常处理的api,所以在异步运行时,如果出了问题是不好定位的。

使用CompletableFuture

场景:主线程里面创建一个CompletableFuture,然后主线程调用get方法会阻塞,执行完子线程再执行主线程

没有返回值的异步任务
package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;

/**
 * @author Dongguo
 * @date 2021/8/24 0024-22:36
 * @description: 没有返回值的异步任务
 */
public class CompletableFutureDemo1 {
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        //运行一个没有返回值的异步任务
        CompletableFuture future = CompletableFuture.runAsync(() -> {
            try {
                System.out.println("子线程启动");
                Thread.sleep(5000);
                System.out.println("子线程完成");
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        //主线程阻塞
        future.get();
        System.out.println("主线程结束");
    }
}
运行结果:
主线程开始
子线程启动
子线程完成
主线程结束
有返回值的异步任务
package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-7:56
 * @description: 有返回值的异步任务
 */
public class CompletableFutureDemo2 {
    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("子线程启动");
                Thread.sleep(5000);

            } catch (Exception e) {
                e.printStackTrace();
            }
            return "子线程完成";
        });
        //主线程阻塞
        String result = future.get();
        System.out.println("主线程结束,子线程结果为:" + result);
    }
}
运行结果:
主线程开始
子线程启动
主线程结束,子线程结果为:子线程完成
线程依赖

当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。

package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:03
 * @description: 先对一个数加10, 然后取平方
 */
public class CompletableFutureDemo3 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("子线程启动 执行加10");
                num += 10;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return num;
        }).thenApply((i) -> {
            return i * i;
        });
        //主线程阻塞
        Integer result = future.get();
        System.out.println("主线程结束,子线程结果为:" + result);
    }
}
运行结果:
主线程开始
子线程启动 执行加10
主线程结束,子线程结果为:400
消费处理结果

thenAccept 消费处理结果, 接收任务的处理结果,并消费处理,无返回结果。

package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:08
 * @description: 消费处理结果thenAccept
 */
public class CompletableFutureDemo4 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("子线程启动 执行加10");
                num += 10;
            } catch (Exception e) {
                e.printStackTrace();
            }
            return num;
        }).thenApply((i) -> {
            return i * i;
        }).thenAccept(new Consumer(){

            @Override
            public void accept(Integer integer) {
                System.out.println("子线程全部处理完成,最后调用了accept,结果为:" + integer);
            }
        });
        //主线程阻塞

        System.out.println("主线程结束");
    }
}
运行结果:
主线程开始
子线程启动 执行加10
子线程全部处理完成,最后调用了accept,结果为:400
主线程结束
异常处理

exceptionally异常处理,出现异常时触发

package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:12
 * @description:
 */
public class CompletableFutureDemo5 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {

            System.out.println("子线程启动");
            int i = 1 / 0;
            num += 10;
            return num;
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return -1;
        });
        //主线程阻塞
        int result = future.get();
        System.out.println("主线程结束,子线程结果为:" + result);
    }
}
运行结果:
主线程开始
子线程启动
java.lang.ArithmeticException: / by zero
主线程结束,子线程结果为:-1

handle类似于thenAccept/thenRun方法,是最后一步的处理调用,但是同时可以处理异常

package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:16
 * @description:
 */
public class CompletableFutureDemo6 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {

            System.out.println("子线程启动");
            int i = 1 / 0;
            num += 10;
            return num;
        }).handle((i, e) -> {
            if (e != null) {
                System.out.println("发生了异常,内容为:" + e.getMessage());
                return -1;
            } else {
                System.out.println("正常完成,内容为: " + i);
                return i;
            }
        });
        //主线程阻塞
        int result = future.get();
        System.out.println("主线程结束,子线程结果为:" + result);
    }
}
主线程开始
子线程启动
发生了异常,内容为:java.lang.ArithmeticException: / by zero
主线程结束,子线程结果为:-1
结果合并

thenCompose合并两个有依赖关系的CompletableFutures的执行结果

package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:25
 * @description:
 */
public class CompletableFutureDemo7 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            System.out.println("子线程启动");
            num += 10;
            return num;
        });
        合并
        CompletableFuture future2 = future.thenCompose(i ->
                CompletableFuture.supplyAsync(() -> {
                    return i + 1;
                }));
        //主线程阻塞
        int result = future.get();
        int result2 = future2.get();

        System.out.println("主线程结束,子线程结果为:" + result);
        System.out.println("合并后结果为:"+result2);
    }
}
运行结果:
主线程开始
子线程启动
主线程结束,子线程结果为:20
合并后结果为:21

thenCombine合并两个没有依赖关系的CompletableFutures任务

package com.dongguo.completable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:30
 * @description:
 */
public class CompletableFutureDemo8 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            System.out.println("子线程启动执行加10");
            num += 10;
            return num;
        });
        CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("子线程启动执行乘10");
            num = num * 10;
            return num;
        });
        //合并两个结果
        CompletableFuture futureTotal = future.thenCombine(future2, new BiFunction() {
            @Override
            public List apply(Integer a, Integer b) {
                List list = new ArrayList();
                list.add(a);
                list.add(b);
                return list;
            }
        });
        System.out.println("子线程启动执行加10结果为"+future.get());
        System.out.println("子线程启动执行乘10结果为"+future2.get());
        System.out.println("主线程结束,合并后结果为:" + futureTotal.get());
    }
}
运行结果:
主线程开始
子线程启动执行加10
子线程启动执行乘10
子线程启动执行加10结果为20
子线程启动执行乘10结果为200
主线程结束,合并后结果为:[20, 200]

合并多个任务的结果allOf与anyOf allOf: 一系列独立的future任务,等其所有的任务执行完后做一些事情

package com.dongguo.completable;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:39
 * @description:
 */
public class CompletableFutureDemo9 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        List list = new ArrayList();
        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            System.out.println("子线程启动执行加10");
            num += 10;
            return num;
        });
        list.add(future);
        CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("子线程启动执行乘10");
            num = num * 10;
            return num;
        });
        list.add(future2);
        CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("子线程启动执行减10");
            num -= 10;
            return num;
        });
        list.add(future3);
        CompletableFuture future4 = CompletableFuture.supplyAsync(() -> {
            System.out.println("子线程启动执行除以10");
            num = num / 10;
            return num;
        });
        list.add(future4);
        //多任务合并
        List collect = list.stream().map(CompletableFuture::join).collect(Collectors.toList());
        System.out.println("主线程结束,合并后结果为:" + collect);
    }
}
运行结果:
主线程开始
子线程启动执行加10
子线程启动执行乘10
子线程启动执行减10
子线程启动执行除以10
主线程结束,合并后结果为:[20, 200, 190, 19]

anyOf: 只要在多个future里面有一个返回,整个任务就可以结束,而不需要等到每一个future结束

package com.dongguo.completable;

import java.util.concurrent.CompletableFuture;


/**
 * @author Dongguo
 * @date 2021/8/25 0025-8:46
 * @description:
 */
public class CompletableFutureDemo10 {
    private static Integer num = 10;

    public static void main(String[] args) throws Exception {
        System.out.println("主线程开始");
        CompletableFuture[] futures = new CompletableFuture[4];

        CompletableFuture future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(5000);
                System.out.println("子线程启动执行加10");
                num += 10;
                return num;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return 0;
            }
        });
        futures[0] = future;
        CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(1000);
                System.out.println("子线程启动执行乘10");
                num = num * 10;
                return num;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return 0;
            }
        });
        futures[1] = future2;
        CompletableFuture future3 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("子线程启动执行减10");
                num -= 10;
                return num;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return 0;
            }
        });
        futures[2] = future3;
        CompletableFuture future4 = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(4000);
                System.out.println("子线程启动执行除以10");
                num = num / 10;
                return num;
            } catch (InterruptedException e) {
                e.printStackTrace();
                return 0;
            }
        });
        futures[3] = future4;
        //多任务合并 有一个任务完成就返回
        CompletableFuture futureOne = CompletableFuture.anyOf(futures);
        System.out.println("主线程结束,合并后结果为:" + futureOne.get());
    }
}
运行结果:
主线程开始
子线程启动执行乘10
主线程结束,合并后结果为:100
关注
打赏
1638062488
查看更多评论
立即登录/注册

微信扫码登录

0.0425s