您当前的位置: 首页 > 

恐龙弟旺仔

暂无认证

  • 0浏览

    0关注

    282博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

Netty源码解析-Future与Promise

恐龙弟旺仔 发布时间:2021-12-28 19:50:43 ,浏览量:0

前言:

    所谓异步编程,那我们首先要明白什么是同步编程?

    我们常见的编程手段基本都是基于同步策略,client发起一次方法调用,server(也可以是与client处于同一JVM的一个方法)处理请求,处理完成后返回响应给client。client在接收到响应之前处于阻塞状态。整个过程有点像之前 IO简述时,针对BIO、NIO和AIO的分析过程。当然这是基于操作系统层面的,我们今天分析的主要是基于应用层面的。

    同步编程示例图如下:

 

    异步编程时,client在发起一起请求后,立即返回一个结果Future,等server(方法)执行完成后,(执行针对client的回调方法),client获取到对应结果值。在整个过程中client是不发生阻塞的。

    异步编程示例图如下:

 

1.JDK提供的Future异步方案
JDK6已经提供了基于Future的异步方案,示例如下:
public class SimpleFutureDemo {

    public static void main(String[] args) throws Throwable, ExecutionException {
        ExecutorService executor = Executors.newFixedThreadPool(1);
        // 使用线程池来执行callable任务
        Future f = executor.submit(new Callable() {

            @Override
            public String call() throws Exception {
                // 模拟任务执行
                System.out.println("begin...");
                Thread.sleep(2000);
                System.out.println("end...");
                return "job executed ";
            }
        });

        // 获取执行结果
        System.out.println(f.get());
        System.out.println("main thread end...");
    }
}
// 执行结果如下:
begin...
// 在等待2秒之后才打印出来end
end...
job executed 
// 主线程的这句最后才打印出来,说明任务的执行获取结果集影响到主线程的执行
main thread end...
我们来看下java.util.concurrent.Future提供的方法
public interface Future {
    // 取消任务
    boolean cancel(boolean mayInterruptIfRunning);
    // 任务是否被取消
    boolean isCancelled();
    
    // 任务是否已完成
    boolean isDone();
    // 获取结果集
    V get() throws InterruptedException, ExecutionException;
    // 在指定时间获取结果集
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

总结:根据上文中的示例,我们在将callable任务提交给ExecutorService后,立即就获取一个Future的结果集,当前主线程不必阻塞等待执行。

但是,当我们调用future.get()方法时,发生了阻塞,一直阻塞到方法执行完成,这里是不太符合我们的预期的。

当然,我们也可以使用另外一种方式,

while (!f.isDone()) {
   // sleep(100);
}
// 获取执行结果
System.out.println(f.get());

不断通过轮询来判断future任务是否已经执行完成。

但是这种方案缺点显而易见,就是主线程一直在执行状态(如果没有sleep的话),CPU消耗过高,那么有没有更好的方案呢?

2.JDK提供的CompletableFuture方案

    在JDK8中,提供了CompletableFuture方案,该方案真正的实现了异步调用,我们不必要阻塞获取结果集,或者不断轮询是否任务已经执行完成,而是通过注册回调函数的方案来实现结果集的获取。

public class SimplePromiseDemo {

    public static void main(String[] args) throws Throwable, ExecutionException {
        // 两个线程的线程池
        ExecutorService executor = Executors.newFixedThreadPool(1);
        //jdk1.8之前的实现方式
        CompletableFuture future = CompletableFuture.supplyAsync(new Supplier() {
            @Override
            public String get() {
                System.out.println("begin...");
                try {
                    //模拟耗时操作
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("end...");
                return "job executed ";
            }
        }, executor);

        //注册future执行成功后的回调函数
        future.thenAccept(e -> System.out.println(e));
        System.out.println("main thread end...");
    }
}

// 打印结果
begin...
// 主线程直接结束,阻塞任务的执行没有影响到主线程
main thread end...
// 2秒后打印出来,
end...
job executed 

总结:结果对比还是比较明显的,在CompletableFuture这种注册回调函数的异步调用方案下,主线程的方法执行没有被callable任务所阻塞,在任务执行完成后,主动执行了future.thenAccept中注册的任务。

这种才是真正的异步执行。

3.Netty提供的异步方案

    作为Netty而言,并没有使用CompletableFuture的方案,而是自己实现了一套异步执行方案。

    我们先来看一下示例代码:

public class NettyPromiseTest {
    public static void main(String[] args) throws Throwable {
        // 使用Netty自定义的线程池组
        EventExecutorGroup group = new DefaultEventExecutorGroup(1);
        Future f = group.submit(new Callable() {

            @Override
            public String call() {
                System.out.println("begin...");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("end...");
                return "job executed";
            }
        });

        //注册监听,也就是上面的成功回调方法
        f.addListener(new FutureListener() {
            @Override
            public void operationComplete(Future f) throws Exception {
                System.out.println(f.get());
            }
        });

        System.out.println("main thread end...");
    }
}

// 打印结果如下:
begin...
// 没有阻塞,main线程直接结束
main thread end...
    
// 2秒后打印出来
end...
job executed

总结:Netty提供的这种异步方案,与上面2中的CompletableFuture比较相似,都是方法调用后返回一个Future,future注册一个回调函数,用于方法完成后的回调。

这些均不影响主线程的后续方法操作。

4.Netty异步策略源码分析

    Netty提供的这种异步策略,还是比较适合在我们实际的工作中的使用的,那它是怎么实现的呢?怎么完成从同步调用到异步调用的转换的呢?我们来看下。

4.1 io.netty.util.concurrent.Future及其实现类
public interface Future extends java.util.concurrent.Future {
 
    // 以下是抽取的重要的方法
    
    // 任务执行是否成功
    boolean isSuccess();
    // 添加监听方法
    Future addListener(GenericFutureListener            
关注
打赏
1655041699
查看更多评论
0.0363s