您当前的位置: 首页 > 

庄小焱

暂无认证

  • 1浏览

    0关注

    805博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

并发编程——ThreadPool原理

庄小焱 发布时间:2020-10-07 19:47:20 ,浏览量:1

摘要

线程池(Thread Pool)是一种基于池化思想管理线程的工具,经常出现在多线程服务器中,如MySQL。线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。

一、线程池的作用

线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:

  1. 频繁申请/销毁资源和调度资源,将带来额外的消耗,可能会非常巨大。
  2. 对资源无限申请缺少抑制手段,易引发系统资源耗尽的风险。
  3. 系统无法合理管理内部的资源分布,会降低系统的稳定性。

为解决资源分配这个问题,线程池采用了“池化”(Pooling)思想。池化,顾名思义,是为了最大化收益并最小化风险,而将资源统一在一起管理的一种思想。池化”思想不仅仅能应用在计算机领域,在金融、设备、人员管理、工作管理等领域也有相关的应用。在计算机领域中的表现为:统一管理IT资源,包括服务器、存储、和网络资源等等。通过共享资源,使用户在低投入中获益。除去线程池,还有其他比较典型的几种使用策略包括:

  1. 内存池(Memory Pooling):预先申请内存,提升申请内存速度,减少内存碎片。
  2. 连接池(Connection Pooling):预先申请数据库连接,提升申请连接的速度,降低系统的开销。
  3. 实例池(Object Pooling):循环使用对象,减少资源在初始化和释放时的昂贵损耗。

Java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来3个好处。

  • 第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 第三:提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
二、线程池的基本使用

线程池的使用和创建可以说非常的简单,这得益于JDK提供给我们良好封装的API。线程池的实现被封装到了ThreadPoolExecutor中,我们可以通过ThreadPoolExecutor的构造方法来实例化出一个线程池,代码如下:

// 实例化一个线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 10, 60,
        TimeUnit.SECONDS, new ArrayBlockingQueue(20));

// 使用线程池执行一个任务        
executor.execute(() -> {
    // Do something
});

// 关闭线程池,会阻止新任务提交,但不影响已提交的任务
executor.shutdown();

// 关闭线程池,阻止新任务提交,并且中断当前正在运行的线程
executor.showdownNow();

创建好线程池后直接调用execute方法并传入一个Runnable参数即可将任务交给线程池执行,通过shutdown/shutdownNow方法可以关闭线程池。

ThreadPoolExecutor的构造方法中参数众多,对于初学者而言在没有了解各个参数的作用的情况下很难去配置合适的线程池。因此Java还为我们提供了一个线程池工具类Executors来快捷的创建线程池。Executors提供了很多简便的创建线程池的方法,举两个例子,代码如下:

// 实例化一个单线程的线程池
ExecutorService singleExecutor = Executors.newSingleThreadExecutor();

// 创建固定线程个数的线程池
ExecutorService fixedExecutor = Executors.newFixedThreadPool(10);

// 创建一个可重用固定线程数的线程池
ExecutorService executorService2 = Executors.newCachedThreadPool();

但是,通常来说在实际开发中并不推荐直接使用Executors来创建线程池,而是需要根据项目实际情况配置适合自己项目的线程池。

三、线程池的生命周期

线程池从诞生到死亡,中间会经历RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED五个生命周期状态。

  • RUNNING 表示线程池处于运行状态,能够接受新提交的任务且能对已添加的任务进行处理。RUNNING状态是线程池的初始化状态,线程池一旦被创建就处于RUNNING状态。

  • SHUTDOWN 线程处于关闭状态,不接受新任务,但可以处理已添加的任务。RUNNING状态的线程池调用shutdown后会进入SHUTDOWN状态。

  • STOP 线程池处于停止状态,不接收任务,不处理已添加的任务,且会中断正在执行任务的线程。RUNNING状态的线程池调用了shutdownNow后会进入STOP状态。

  • TIDYING 当所有任务已终止,且任务数量为0时,线程池会进入TIDYING。当线程池处于SHUTDOWN状态时,阻塞队列中的任务被执行完了,且线程池中没有正在执行的任务了,状态会由SHUTDOWN变为TIDYING。当线程处于STOP状态时,线程池中没有正在执行的任务时则会由STOP变为TIDYING。

  • TERMINATED 线程终止状态。处于TIDYING状态的线程执行terminated()后进入TERMINATED状态。

根据上述线程池生命周期状态的描述,可以画出如下所示的线程池生命周期状态流程示意图。

四、线程池的核心参数

使用ThreadPoolExecutor的构造方法来创建了一个线程池。其实在ThreadPoolExecutor中有多个构造方法,但是最终都调用到了下边代码中的这一个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        // ...省略校验相关代码
        
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
    
    // ...    

}

这个构造方法中有7个参数之多,我们逐个来看每个参数所代表的含义:


corePoolSize:表示线程池的核心线程数。当有任务提交到线程池时,如果线程池中的线程数小于corePoolSize,那么则直接创建新的线程来执行任务。


workQueue:任务队列,它是一个阻塞队列,用于存储来不及执行的任务的队列。当有任务提交到线程池的时候,如果线程池中的线程数大于等于corePoolSize,那么这个任务则会先被放到这个队列中,等待执行。


maximumPoolSize:表示线程池支持的最大线程数量。当一个任务提交到线程池时,线程池中的线程数大于corePoolSize,并且workQueue已满,那么则会创建新的线程执行任务,但是线程数要小于等于maximumPoolSize。


keepAliveTime:核心线程数默认是没有超时时间,非核心线程数有(超时时间:若工作线程数大于核心线程数,且从等待队列中超时获取,则把当前线程移除)


unit:非核心线程空闲时保持存活的时间的单位


threadFactory:创建线程的工厂,可以在这里统一处理创建线程的属性


handler:拒绝策略,当线程池中的线程达到maximumPoolSize线程数后且workQueue已满的情况下,再向线程池提交任务则执行对应的拒绝策略
五、线程池工作流程

从图中可以看出,当提交一个新任务到线程池时,线程池的处理流程如下。

  • 线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
  • 线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
  • 线程池判断线程池的线程是否都处工作状态。如果没有,则创建新的工作线程来执行任务。如果已经满了,交给饱和策略来处理这个任务。ThreadPool执行execute()方法示意图。

 任务缓冲:任务缓冲模块是线程池能够管理任务的核心部分。线程池的本质是对任务和线程的管理,而做到这一点最关键的思想就是将任务和线程两者解耦,不让两者直接关联,才可以做后续的分配工作。线程池中是以生产者消费者模式,通过一个阻塞队列来实现的。阻塞队列缓存任务,工作线程从阻塞队列中获取任务。

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作是:在队列为空时,获取元素的线程会等待队列变为非空。当队列满时,存储元素的线程会等待队列可用。阻塞队列常用于生产者和消费者的场景,生产者是往队列里添加元素的线程,消费者是从队列里拿元素的线程。阻塞队列就是生产者存放元素的容器,而消费者也只从容器里拿元素。

使用不同的队列可以实现不一样的任务存取策略。在这里,我们可以再介绍下阻塞队列的成员:

任务申请:由上文的任务分配部分可知,任务的执行有两种可能:一种是任务直接由新创建的线程执行。另一种是线程从任务队列中获取任务然后执行,执行完任务的空闲线程会再次去从队列中申请任务再去执行。第一种情况仅出现在线程初始创建的时候,第二种是线程获取任务绝大多数的情况。线程需要从任务缓存模块中不断地取任务执行,帮助线程从阻塞队列中获取任务,实现线程管理模块和任务管理模块之间的通信。这部分策略由getTask方法实现,其执行流程如下图所示:

getTask这部分进行了多次判断,为的是控制线程的数量,使其符合线程池的状态。如果线程池现在不应该持有那么多线程,则会返回null值。工作线程Worker会不断接收新任务去执行,而当工作线程Worker接收不到任务的时候,就会开始被回收。

六、线程池的拒绝策略

如果线程池中的线程数达到了maximumPoolSize,并且workQueue队列存储满的情况下,线程池会执行对应的拒绝策略。在JDK中提供了RejectedExecutionHandler接口来执行拒绝操作。实现RejectedExecutionHandler的类有四个,对应了四种拒绝策略。分别如下:

  • DiscardPolicy:当提交任务到线程池中被拒绝时,线程池会丢弃这个被拒绝的任务

  • DiscardOldestPolicy:当提交任务到线程池中被拒绝时,线程池会丢弃等待队列中最老的任务。

  • CallerRunsPolicy:当提交任务到线程池中被拒绝时,会在线程池当前正在运行的Thread线程中处理被拒绝额任务。即哪个线程提交的任务哪个线程去执行。

  • AbortPolicy:当提交任务到线程池中被拒绝时,直接抛出RejectedExecutionException异常。

七、线程池的线程数配置

要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。 任务的性质:

  • CPU密集型任务
  • IO密集型任务和混合型任务。

性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如配置N:cpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*N cpu。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先执行。

注意:如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让执行时间短的任务先执行。依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。有一次,我们系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然,我们的系统所有的任务是用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是出现这样问题时也会影响到其他任务。

八、线程池的源码分析 8.1 线程池中的位运算

在向线程池提交任务时有两个比较中要的参数会决定任务的去向,这两个参数分别是线程池的状态和线程池中的线程数。在ThreadPoolExecutor内部使用了一个AtomicInteger类型的整数ctl来表示这两个参数,代码如下:

public class ThreadPoolExecutor extends AbstractExecutorService {
    // Integer.SIZE = 32.所以 COUNT_BITS= 29
    private static final int COUNT_BITS = Integer.SIZE - 3;

    // 00011111 11111111 11111111 11111111 这个值可以表示线程池的最大线程容量
    private static final int COUNT_MASK = (1  largestPoolSize)
                            largestPoolSize = s;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    // 开启线程执行任务
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

这部分逻辑其实比较容易理解,就是创建Worker并开启线程执行任务的过程,Worker是对线程的封装,创建的worker会被添加到ThreadPoolExecutor中的HashSet中。也就是线程池中的线程都维护在这个名为workers的HashSet中并被ThreadPoolExecutor所管理,HashSet中的线程可能处于正在工作的状态,也可能处于空闲状态,一旦达到指定的空闲时间,则会根据条件进行回收线程。

我们知道,线程调用start后就会开始执行线程的逻辑代码,执行完后线程的生命周期就结束了,那么线程池是如何保证Worker执行完任务后仍然不结束的呢?当线程空闲超时或者关闭线程池又是怎样进行线程回收的呢?这个实现逻辑其实就在Worker中。看下Worker的代码:

 private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        // 执行任务的线程
        final Thread thread;
        // 初始化Worker时传进来的任务,可能为null,如果不空,
        // 则创建和立即执行这个task,对应核心线程创建的情况
        Runnable firstTask;

        Worker(Runnable firstTask) {
            // 初始化时设置setate为-1
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            // 通过线程工程创建线程
            this.thread = getThreadFactory().newThread(this);
        }
        
        // 线程的真正执行逻辑
        public void run() {
            runWorker(this);
        }
        
        // 判断线程是否是独占状态,如果不是意味着线程处于空闲状态
        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        // 获取锁
        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }
        // 释放锁
        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }
        // ...
    }

Worker是位于ThreadPoolExecutor中的一个内部类,它继承了AQS,使用AQS来实现了独占锁的功能,但是并没支持可重入。这里使用不可重入的特性来表示线程的执行状态,即可以通过isHeldExclusively方法来判断,如果是独占状态,说明线程正在执行任务,如果非独占状态,说明线程处于空闲状态。

另外,Worker还实现了Runnable接口,因此它的执行逻辑就是在run方法中,run方法调用的是线程池中的runWorker(this)方法。任务的执行逻辑就在runWorker方法中,它的代码如下:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        // 取出Worker中的任务,可能为空
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            // task不为null或者阻塞队列中有任务,通过循环不断的从阻塞队列中取出任务执行
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // ...
                try {
                    // 任务执行前的hook点
                    beforeExecute(wt, task);
                    try {
                        // 执行任务
                        task.run();
                        // 任务执行后的hook点
                        afterExecute(task, null);
                    } catch (Throwable ex) {
                        afterExecute(task, ex);
                        throw ex;
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            // 超时没有取到任务,则回收空闲超时的线程
            processWorkerExit(w, completedAbruptly);
        }
    }

可以看到,runWorker的核心逻辑就是不断通过getTask方法从阻塞队列中获取任务并执行.通过这样的方式实现了线程的复用,避免了创建线程。这里要注意的是这里是一个“生产者-消费者”模式,getTask是从阻塞队列中取任务,所以如果阻塞队列中没有任务的时候就会处于阻塞状态。getTask中通过判断是否要回收线程而设置了等待超时时间,如果阻塞队列中一直没有任务,那么在等待keepAliveTime时间后会返回一个null。最终会走到上述代码的finally方法中,意味着有线程空闲时间超过了keepAliveTime时间,那么调用processWorkerExit方法移除Worker。

    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            // ...
           

            // Flag1. 如果配置了allowCoreThreadTimeOut==true或者线程池中的
            // 线程数大于核心线程数,则timed为true,表示开启指定线程超时后被回收
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            // ...

            try {
                // Flag2. 取出阻塞队列中的任务,注意如果timed为true,则会调用阻塞队列的poll方法,
                // 并设置超时时间为keepAliveTime,如果超时没有取到任务则会返回null。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

重点看getTask是如何处理空闲超时的逻辑的。我们知道,回收线程的条件是线程大于核心线程数或者配置了allowCoreThreadTimeOut为true,当线程空闲超时的情况下就会回收线程。上述代码在Flag1处先判断了如果线程池中的线程数大于核心线程数,或者开启了allowCoreThreadTimeOut,那么就需要开启线程空闲超时回收。所有在Flag2处,timed为true的情况下调用了阻塞队列的poll方法,并传入了超时时间为keepAliveTime,poll方法是一个阻塞方法,在没有任务时候回进行阻塞。如果在keepAliveTime时间内,没有获取到任务,那么poll方法就会返回null,结束runWorker的循环。进而执行runWorker方法中回收线程的操作。

这里需要我们理解阻塞队列poll方法的使用,poll方法接受一个时间参数,是一个阻塞操作,在给定的时间内没有获取到数据就返回null。poll方法的核心代码如下:

while (count == 0) { 
  if (nanos             
关注
打赏
1657692713
查看更多评论
0.0435s