什么是线程池
线程池和数据库连接池非常类似,可以统一管理和维护线程,减少没有必要的开销
为什么要使用线程池
因为频繁的开启线程或者停止线程,线程需要从新被cpu从就绪到运行状态调度,需要发生cpu的上下文切换,效率非常低。
线程池是复用机制
提前创建好一些固定的线程数一直在运行状态实现复用﹐从而可以减少就绪到运行状态的切换。
在哪些地方会使用线程池
实际开发项目中禁止自己new线程。必须使用线程池来维护和创建线程。防止资源耗尽。
线程池有哪些作用
核心点:
复用机制提前创建好固定的线程一直在运行状态实现复用限制线程创建数量。
- 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
- 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
- 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
- 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。
线程池的创建方式
底层构造方法
Executors.newCachedThreadPool()
可缓存线程池
创建无限大小的线程池 Integer.MAX_VALUE 正式开发等环境一般不使用
Executors.newFixedThreadPool()
可定长度线程池
Executors.newScheduledThreadPool()
可定时
继承于ThreadPoolExecutor
Executors.newSingleThreadExecutor()
单例
阿里巴巴规范手册不推荐使用JDK自带的线程池。
底层都是基于ThreadPoolExecutor封装
为什么阿里巴巴不建议使用Executors
因为默认的Executors线程池底层是基于ThreadPoolExecutor构造函数封装的,采用无界(MAX_VALUE)队列存放缓存任务,会一直缓存任务容易发生线程池队列溢出。
线程池底层是如何实现复用的
本质思想:创建一个线程,不会立马停止或者销毁而是一直实现****复用** 。**
- 提前创建固定大小的线程一直保持在正在运行状态;(可能会非常消耗 cpu的资源)
- 当需要线程执行任务,将该任务提交缓存在并发队列中;如果缓存队列满了,则会执行拒绝策略;
- 正在运行的线程从并发队列中获取任务执行从而实现多线程复用问题;
如何保证线程一直在运行状态
通过死循环实现
线程池核心点
线程池核心点:复用机制
1.提前创建好固定的线程一直在运行状态----死循环实现
2.提交的线程任务缓存到一个并发队列集合中,交给我们正在运行的线程执
3.正在运行的线程就从队列中获取该任务执行
线程池创建的线程会一直在运行状态吗
不会
例如:配置核心线程数 corePoolSize为2、最大线程数maximumPoolSize为5我们可以通过配置超出 corePoolSize核心线程数后创建的线程的存活时间例如为60s在60s内没有核心线程一直没有任务执行,则会停止该线程。
线程池执行原理
解析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// ctl对象功能很强大,其高3位代表线程池的状态,低29位代表线程池中的线程数量
int c = ctl.get();
// 如果当前线程池中线程数量小于核心线程数,则新建一个线程并将当前任务直接赋予该线程执行
if (workerCountOf(c) < corePoolSize) {
// 如果新建线程成功则直接返回
if (addWorker(command, true))
return;
// 到这一步说明新建失败,可能是线程池意外关闭或者是由于并发的原因导致当前线程数大于等于核心线程数了,重新获取ctl对象
c = ctl.get();
}
// 如果当前线程处于运行态并且任务入队列成功,则继续执行下面的逻辑
if (isRunning(c) && workQueue.offer(command)) {
// 这里需要再次确认线程池是否仍处于运行态
int recheck = ctl.get();
// 如果非运行态则需要删除队列中的任务,然后拒绝该任务
if (! isRunning(recheck) && remove(command))
reject(command);
// 确保线程池中仍有运行的线程,如果都未存活则新建一个线程且不指定任务参数,让该线程自行去队列中获取任务执行
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果处于非运行态或者入队列不成功(队列满了),尝试扩容线程池线程数量至maxPoolSize,若扩容失败,则拒绝该任务
else if (!addWorker(command, false))
reject(command);
}
分3步进行:
- 如果运行的线程少于corePoolSize,请尝试以给定的命令作为第一个线程启动一个新线程的任务。对addWorker的调用会自动检查runState和
workerCount,这样可以防止添加错误警报当它不应该线程时,返回false。 - 如果一个任务可以成功排队,那么我们仍然需要再次检查我们是否应该添加一个线程(因为自上次检查以来已有的已经死亡)或其他进入这个方法后池就关闭了。所以我们重新检查状态,必要时回滚入队列的if已停止,如果没有则启动新线程。
- 如果不能将任务排队,则尝试添加新的线程。如果它失败了,我们知道我们已经关闭或饱和了因此拒绝了这项任务。
通俗:
1.当线程数小于核心线程数时,创建线程。
2.当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
3.当线程数大于等于核心线程数,且任务队列已满
若线程数小于最大线程数,创建线程
若线程数等于最大线程数,抛出异常,拒绝任务
addWorker()
private boolean addWorker(Runnable firstTask, boolean core) {
// 首先是一个外层死循环
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查当前线程池是否处于非运行态,同时确保队列中任务数不为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内存死循环修改运行的线程数量
for (;;) {
int wc = workerCountOf(c);
// core参数确保不会超过线程池设定的值
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 采用CAS算法将线程数+1,如果成功则直接跳出外循环,失败主要是因为并发修改导致,那么则再次内循环判断
if (compareAndIncrementWorkerCount(c))
break retry;
// 确保线程池运行状态没变,若发生改变,则从外循环开始判断
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建Worker内部类时主要干了两件事,一个是设置AQS同步锁标识为-1,另一个是调用线程工厂创建线程并赋值给Worker的成员变量
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 在往workers这个线程集合增加线程时需要进行加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 此处需要进行二次检查,防止线程池被关闭等异常情况
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果当前线程已经启动,处于活跃态则抛异常
if (t.isAlive())
throw new IllegalThreadStateException();
// workers是一个HashSet集合
workers.add(w);
int s = workers.size();
// 设置最大池大小,同时标识任务线程增加成功,即 workerAdded 设为true
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果任务线程成功增加,则在此处启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
看似很长其实都是在做一些基本的判断,防止异常情况,简单总结下:
- 首先判断是否处于运行态且当前运行的线程数是否小于上限值,若是则通过CAS算法先将运行线程数+1
- CAS操作成功后,新建Worker线程并通过加锁的方式将其加入到线程集合,然后启动线程
- 接下来看下 Worker 内部类,该类包装了任务线程,主要是为了控制线程中断,即当线程池关闭的时候需要中断对应的线程任务,这里说的中断是在等待从workQueue中获取任务getTask()时才能中断,即线程真正开始运行后才允许中断,因此初始化时lock状态为负值(-1)。
线程池核心参数
corePoolSize:核心线程数量一直正在保持运行的线程
maximumPoolSize:最大线程数,线程池允许创建的最大线程数
keepAliveTime:超出corePoolSize后创建的线程的存活时间。
unit: keepAliveTime的时间单位。
workQueue:任务队列,用于保存待执行的任务。
threadFactory:线程池内部创建线程所用的工厂。
handler:任务无法执行时的处理器。
重写线程池核心参数举例
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @author Wcy
* @Date 2022/10/5 22:12
*/
public class MyThreadPoolExecutor {
public static ExecutorService newFixedThreadPool(int corePoolSize, int maximumPoolSize, int queueSize) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 30L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(queueSize));
}
}
应用举例
import java.util.concurrent.ExecutorService;
/**
* @author Wcy
* @Date 2022/10/5 22:11
*/
public class MyThreadPool {
public static void main(String[] args) {
//1,提交的线壁任务数<核心没程数(核心线程数任务贬用》
//2.提交的线程任务数核心线程数且我们队列容量没有满将该任务缓存到我们队死中
//3.循环中 3,4,5,6,7 缓存到队列中
//4.最多再创建两个线程 执行 8,9
//5.再创建线程数超过最大线程数,将任务交给拒绝策略处理抛出异常
ExecutorService fixedThreadPool = MyThreadPoolExecutor.newFixedThreadPool(2, 4, 5);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行"+index);
}
});
}
}
}
/**
* pool-1-thread-1正在执行0
* pool-1-thread-4正在执行8
* pool-1-thread-2正在执行1
* pool-1-thread-3正在执行7
* pool-1-thread-3正在执行5
* pool-1-thread-2正在执行4
* pool-1-thread-4正在执行3
* pool-1-thread-1正在执行2
* pool-1-thread-3正在执行6
* Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.example.test.study.MyThreadPool$1@312b1dae rejected from java.util.concurrent.ThreadPoolExecutor@7530d0a[Running, pool size = 4, active threads = 4, queued tasks = 3, completed tasks = 1]
* at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
* at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
* at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
* at com.example.test.study.MyThreadPool.main(MyThreadPool.java:20)
*/
线程池队列满了,任务会丢失吗
如果队列满了,且任务总数>最大线程数则当前线程走拒绝策略。
可以自定义异拒绝异常,将该任务缓存到redis、本地文件、mysql中后期项目启动实现补偿。
1 .AbortPolicy丢弃任务,抛运行时异常
2.CallerRunsPolicy执行任务
3.DiscardPolicy 忽视,什么都不会发生
4.DiscardOldestPolicy从队列中踢出最先进入队列(最后一个执行)的任务
5.实现RejectedExecutionHandler接口,可自定义处理器
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author Wcy
* @Date 2022/10/5 22:36
*/
public class MyThreadHandler implements ThreadFactory, RejectedExecutionHandler {
@Override
public Thread newThread(Runnable r) {
return null;
}
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("线程池已满");
r.run();
}
}
import java.util.concurrent.*;
/**
* @author Wcy
* @Date 2022/10/5 22:12
*/
public class MyThreadPoolExecutor {
public static ExecutorService newFixedThreadPool(int corePoolSize, int maximumPoolSize, int queueSize) {
return new ThreadPoolExecutor(corePoolSize, maximumPoolSize, 30L, TimeUnit.MILLISECONDS,
(BlockingQueue<Runnable>) new LinkedBlockingQueue<Runnable>(queueSize), (RejectedExecutionHandler) new MyThreadHandler());
}
}
最终结果
线程池已满
pool-1-thread-2正在执行1
pool-1-thread-3正在执行7
pool-1-thread-1正在执行0
pool-1-thread-1正在执行4
pool-1-thread-1正在执行5
pool-1-thread-1正在执行6
pool-1-thread-2正在执行2
pool-1-thread-3正在执行3
pool-1-thread-4正在执行8
main正在执行9
手写线程池实现
package com.example.test.study;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
/**
* @author Wcy
* @Date 2022/10/5 21:20
*/
public class MyExecutors {
/**
* 创建一个线程池集合
*/
private static List<WorkThread> workThreads;
/**
* 创建一个阻塞队列(无界)
*/
private static BlockingDeque<Runnable> runnableBlockingDeque;
/**
* 开启/关闭线程池
*/
private static boolean isRun = true;
/**
* 创建一个线程池
* @param maxThreadCount
* @param queueSize
*/
public MyExecutors(int maxThreadCount, int queueSize) {
// 限制容量
runnableBlockingDeque = new LinkedBlockingDeque<>(queueSize);
workThreads = new ArrayList<WorkThread>(maxThreadCount);
for (int i = 0; i < maxThreadCount; i++) {
new WorkThread().start();
}
}
/**
* 提交任务
*
* @param command
*/
public boolean execute(Runnable command) {
return runnableBlockingDeque.offer(command);
}
/**
* 创建线程
*/
static class WorkThread extends Thread {
@Override
public void run() {
while (isRun) {
Runnable runnable = runnableBlockingDeque.poll();
if (runnable != null) {
runnable.run();
}
}
}
}
public static void main(String[] args) {
MyExecutors myExecutors = new MyExecutors(2, 2);
for (int i = 0; i < 10; i++) {
myExecutors.execute(new Runnable() {
@Override
public void run() {
System.out.println("当前线程:" + Thread.currentThread().getName());
}
});
}
myExecutors.isRun = false;
}
}
此处评论已关闭