介绍
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
线程池的优势:线程池做的工作只要是控制运行的线程数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
它的主要特点为:线程复用 ;控制最大并发数;管理线程
• 降低资源消耗: 通过重复利用已创建的线程降低线程创建和销毁造成的销耗。 • 提高响应速度: 当任务到达时,任务可以不需要等待线程创建就能立即执行。 • 提高线程的可管理性: 线程是稀缺资源,如果无限制的创建,不仅会销耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
架构Java中的线程池是通过Executor框架实现的,该框架中用到了Executor,Executors,ExecutorService,ThreadPoolExecutor这几个类
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize {
log.debug("task1");
int i = 1 / 0;
return true;
});
log.debug("result:{}", f.get());
}
}
运行结果
13:16:50 [pool-1-thread-1] d.FixedThreadPoolDemo - task1
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at com.dongguo.pool.FixedThreadPoolDemo.main(FixedThreadPoolDemo.java:24)
Caused by: java.lang.ArithmeticException: / by zero
at com.dongguo.pool.FixedThreadPoolDemo.lambda$main$0(FixedThreadPoolDemo.java:21)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
线程池底层工作原理(重要)
-
在创建了线程池后,线程池中的线程数为零
-
当调用execute()方法添加一个请求任务时,线程池会做出如下判断:
2.1 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;
2.2 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;
2.3 如果这个时候队列满了且正在运行的线程数量还小于maximumPoolSize,那么还是要创建非核心线程立刻运行这个任务;
2.4 如果队列满了且正在运行的线程数量大于或等于maximumPoolSize,那么线程池会启动饱和拒绝策略来执行。
-
当一个线程完成任务时,它会从队列中取下一个任务来执行
-
当一个线程无事可做超过一定的时间(keepAliveTime)时,线程会判断:
4.1 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。
4.2 所以线程池的所有任务完成后,它最终会收缩到corePoolSize的大小
当队列满了,创建的非核心线程池是处理新的任务还是处理队列中的任务
线程池里的线程会去取这个队列里的任务。当一个新任务插入队列时,一个空闲线程就会成功的从队列中取出任务并且执行它。
当阻塞队列满了时,会创建非核心线程池处理新的任务,如果后续有线程空闲,会取出队列中的任务执行,新的任务如果发现核心线程满了,阻塞队列没满,就会进入队列中排队。
注意事项(重要)-
项目中创建多线程时,使用常见的三种线程池创建方式,单一、可变、定长都有一定问题,原因是FixedThreadPool和SingleThreadExecutor底层都是用LinkedBlockingQueue实现的,这个队列最大长度为Integer.MAX_VALUE,容易导致OOM。所以实际生产一般自己通过ThreadPoolExecutor的7个参数,自定义线程池
-
创建线程池推荐使用ThreadPoolExecutor及其7个参数手动创建 corePoolSize线程池的核心线程数 maximumPoolSize能容纳的最大线程数 keepAliveTime空闲线程存活时间 unit 存活的时间单位 workQueue 存放提交但未执行任务的队列 threadFactory 创建线程的工厂类 handler 等待队列满后的拒绝策略
-
为什么不允许适用不允许Executors.的方式手动创建线程池,如下图
-
拒绝策略AbortPolicy
package com.dongguo.pool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/8/24 0024-21:52
* @description: 自定义创建线程池
*/
public class ThreadPoolDemo2 {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.AbortPolicy()
);
//10个顾客请求
try {
for (int i = 1; i {
System.out.println(Thread.currentThread().getName() + "办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
pool.shutdown();
}
}
}
运行结果:
pool-1-thread-1办理业务
pool-1-thread-2办理业务
pool-1-thread-2办理业务
pool-1-thread-2办理业务
pool-1-thread-2办理业务
pool-1-thread-3办理业务
pool-1-thread-4办理业务
pool-1-thread-5办理业务
java.util.concurrent.RejectedExecutionException: Task com.dongguo.pool.ThreadPoolDemo2$$Lambda$1/1989780873@16b98e56 rejected from java.util.concurrent.ThreadPoolExecutor@7ef20235[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 2]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.dongguo.pool.ThreadPoolDemo2.main(ThreadPoolDemo2.java:27)
拒绝策略CallerRunsPolicy
package com.dongguo.pool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Dongguo
* @date 2021/8/24 0024-21:52
* @description: 自定义创建线程池
*/
public class ThreadPoolDemo2 {
public static void main(String[] args) {
ThreadPoolExecutor pool = new ThreadPoolExecutor(
2,
5,
2L,
TimeUnit.SECONDS,
new ArrayBlockingQueue(3),
Executors.defaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy()
);
//10个顾客请求
try {
for (int i = 1; i {
System.out.println(Thread.currentThread().getName() + "办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
pool.shutdown();
}
}
}
运行结果
pool-1-thread-2办理业务
main办理业务
pool-1-thread-1办理业务
main办理业务
pool-1-thread-4办理业务
pool-1-thread-1办理业务
pool-1-thread-4办理业务
pool-1-thread-1办理业务
pool-1-thread-3办理业务
pool-1-thread-5办理业务
手写自定义线程池
手写自定义线程池(不带拒绝策略)
阻塞队列BlockingQueue
package com.dongguo.pool;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @author Dongguo
* @date 2021/9/13 0013-8:17
* @description: 自定义阻塞队列
*/
public class BlockingQueue {
// 1. 任务队列 双向队列
private Deque queue = new ArrayDeque();
// 2. 锁
private ReentrantLock lock = new ReentrantLock();
// 3. 生产者条件变量
private Condition fullWaitSet = lock.newCondition();
// 4. 消费者条件变量
private Condition emptyWaitSet = lock.newCondition();
// 5. 容量
private int capcity;
public BlockingQueue(int capcity) {
this.capcity = capcity;
}
/**
* 阻塞获取
* @return
*/
public T take(){
lock.lock();
try {
//当队列为空,无法获取,等待非空
while (queue.isEmpty()){
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//不为空获取队首元素
T t = queue.removeFirst();
//唤醒生产者
fullWaitSet.signal();
return t;
}finally {
lock.unlock();
}
}
/**
* 带超时时间阻塞添加
* @return
*/
public T poll(long timeout, TimeUnit unit){
lock.lock();
try {
//统一时间单位纳秒
long nanos = unit.toNanos(timeout);
//当队列为空,无法获取,等待非空
while (queue.isEmpty()){
try {
// 返回值是剩余时间
if (nanos
关注
打赏
最近更新
- 深拷贝和浅拷贝的区别(重点)
- 【Vue】走进Vue框架世界
- 【云服务器】项目部署—搭建网站—vue电商后台管理系统
- 【React介绍】 一文带你深入React
- 【React】React组件实例的三大属性之state,props,refs(你学废了吗)
- 【脚手架VueCLI】从零开始,创建一个VUE项目
- 【React】深入理解React组件生命周期----图文详解(含代码)
- 【React】DOM的Diffing算法是什么?以及DOM中key的作用----经典面试题
- 【React】1_使用React脚手架创建项目步骤--------详解(含项目结构说明)
- 【React】2_如何使用react脚手架写一个简单的页面?