您当前的位置: 首页 > 

Dongguo丶

暂无认证

  • 2浏览

    0关注

    472博文

    0收益

  • 0浏览

    0点赞

    0打赏

    0留言

私信
关注
热门博文

ThreadPool线程池

Dongguo丶 发布时间:2021-09-19 15:01:05 ,浏览量:2

介绍

线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。

线程池的优势:

线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

它的主要特点为:

线程复用 ;控制最大并发数;管理线程

• 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。 • 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。 • 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

架构

Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类

image-20210824211714184

线程池ThreadPoolExecutor参数说明
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize  {
            log.debug("task1");
            int i = 1 / 0;
            return true;
        });
        log.debug("result:{}", f.get());
    }
}
运行结果
13:16:50 [pool-1-thread-1] d.FixedThreadPoolDemo - task1
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at com.dongguo.pool.FixedThreadPoolDemo.main(FixedThreadPoolDemo.java:24)
Caused by: java.lang.ArithmeticException: / by zero
	at com.dongguo.pool.FixedThreadPoolDemo.lambda$main$0(FixedThreadPoolDemo.java:21)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
线程池底层工作原理(重要)

image-20210824213244952

img

  1. 在创建了线程池后,线程池中的线程数为零

  2. 当调用execute()方法添加一个请求任务时,线程池会做出如下判断:

    2.1 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;

    2.2 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;

    2.3 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;

    2.4 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。

  3. 当一个线程完成任务时,它会从队列中取下一个任务来执行

  4. 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:

    4.1 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。

    4.2 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小

    当队列满了,创建的非核心线程池是处理新的任务还是处理队列中的任务

    线程池里的线程会去取这个队列里的任务。当一个新任务插入队列时,一个空闲线程就会成功的从队列中取出任务并且执行它。

    当阻塞队列满了时,会创建非核心线程池处理新的任务,如果后续有线程空闲,会取出队列中的任务执行,新的任务如果发现核心线程满了,阻塞队列没满,就会进入队列中排队。

    注意事项(重要)
    1. 项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都有一定问题,原因是FixedThreadPool和SingleThreadExecutor底层都是用LinkedBlockingQueue实现的,这个队列最大长度为Integer.MAX_VALUE,容易导致OOM。所以实际生产一般自己通过ThreadPoolExecutor的7个参数,自定义线程池

    2. 创建线程池推荐使用ThreadPoolExecutor及其7个参数手动创建 corePoolSize线程池的核心线程数 maximumPoolSize能容纳的最大线程数 keepAliveTime空闲线程存活时间 unit 存活的时间单位 workQueue 存放提交但未执行任务的队列 threadFactory 创建线程的工厂类 handler 等待队列满后的拒绝策略

    3. 为什么不允许适用不允许Executors.的方式手动创建线程池,如下图

      image-20210824213511606

自定义线程池ThreadPoolExecutor

拒绝策略AbortPolicy

package com.dongguo.pool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/8/24 0024-21:52
 * @description: 自定义创建线程池
 */
public class ThreadPoolDemo2 {
    public static void main(String[] args) {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                2,
                5,
                2L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.AbortPolicy()
        );
        //10个顾客请求
        try {
            for (int i = 1; i  {
                    System.out.println(Thread.currentThread().getName() + "办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }
}
运行结果:
pool-1-thread-1办理业务
pool-1-thread-2办理业务
pool-1-thread-2办理业务
pool-1-thread-2办理业务
pool-1-thread-2办理业务
pool-1-thread-3办理业务
pool-1-thread-4办理业务
pool-1-thread-5办理业务
java.util.concurrent.RejectedExecutionException: Task com.dongguo.pool.ThreadPoolDemo2$$Lambda$1/1989780873@16b98e56 rejected from java.util.concurrent.ThreadPoolExecutor@7ef20235[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 2]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at com.dongguo.pool.ThreadPoolDemo2.main(ThreadPoolDemo2.java:27)

拒绝策略CallerRunsPolicy

package com.dongguo.pool;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author Dongguo
 * @date 2021/8/24 0024-21:52
 * @description: 自定义创建线程池
 */
public class ThreadPoolDemo2 {
    public static void main(String[] args) {
        ThreadPoolExecutor pool = new ThreadPoolExecutor(
                2,
                5,
                2L,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.CallerRunsPolicy()
        );
        //10个顾客请求
        try {
            for (int i = 1; i  {
                    System.out.println(Thread.currentThread().getName() + "办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            pool.shutdown();
        }
    }
}
运行结果
pool-1-thread-2办理业务
main办理业务
pool-1-thread-1办理业务
main办理业务
pool-1-thread-4办理业务
pool-1-thread-1办理业务
pool-1-thread-4办理业务
pool-1-thread-1办理业务
pool-1-thread-3办理业务
pool-1-thread-5办理业务
手写自定义线程池 手写自定义线程池(不带拒绝策略)

阻塞队列BlockingQueue

package com.dongguo.pool;

import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author Dongguo
 * @date 2021/9/13 0013-8:17
 * @description: 自定义阻塞队列
 */
public class BlockingQueue {
    // 1. 任务队列  双向队列
    private Deque queue = new ArrayDeque();
    // 2. 锁
    private ReentrantLock lock = new ReentrantLock();
    // 3. 生产者条件变量
    private Condition fullWaitSet = lock.newCondition();
    // 4. 消费者条件变量
    private Condition emptyWaitSet = lock.newCondition();
    // 5. 容量
    private int capcity;

    public BlockingQueue(int capcity) {
        this.capcity = capcity;
    }

    /**
     * 阻塞获取
     * @return
     */
    public T take(){
        lock.lock();
        try {
            //当队列为空,无法获取,等待非空
            while (queue.isEmpty()){
                try {
                    emptyWaitSet.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //不为空获取队首元素
            T t = queue.removeFirst();
            //唤醒生产者
            fullWaitSet.signal();
            return t;
        }finally {
            lock.unlock();
        }
    }
    /**
     * 带超时时间阻塞添加
     * @return
     */
    public T poll(long timeout, TimeUnit unit){
        lock.lock();
        try {
            //统一时间单位纳秒
            long nanos = unit.toNanos(timeout);
            //当队列为空,无法获取,等待非空
            while (queue.isEmpty()){
                try {
                    // 返回值是剩余时间
                    if (nanos             
关注
打赏
1638062488
查看更多评论
0.0885s