ThreadPoolExecutor线程池介绍
一、概述
1.1 线程池的作用
线程池技术就是线程的重用技术,使用已经创建好的线程来执行当前任务,并提供了针对线程周期开销和资源冲突问题的解决方案。 由于请求到达时线程已经存在,因此消除了线程创建过程导致的延迟,使应用程序得到更快的响应。
1.2 线程池的好处
- 使用线程池可以重复利用已有的线程继续执行任务,避免线程在创建和销毁时造成的消耗;
- 由于没有线程创建和销毁时的消耗,可以提高系统响应速度;
- 通过线程池可以对线程进行合理的管理,根据系统的承受能力调整可运行线程数量的大小等。
1.3 线程池的结构
说明
Executor:线程池相关顶级接口,是 Java 异步任务目标的“执行者”接口,其目标是执行目标任务。只定义了一个
execute()
方法(void execute(Runnable command);),只能提交Runnable形式的任务,不支持提交Callable带有返回值的任务。Executor
作为执行者的角色,其目的是提供一种将“任务提交者”与“任务执行者”分离开来的机制。ExecutorService:继承并扩展了Executor接口,在Executor的基础上加入了线程池的生命周期管理,可以通过
shutdown
或者shutdownNow
方法来关闭线程池。ExecutorService
支持提交Callable形式的任务,提交完Callable任务后拿到一个Future(代表一个异步任务执行的结果)。AbstarctExecutorService:是一个抽象类,它实现了
ExecutorService
接口,为ExecutorService
接口中的方法提供默认实现。ThreadPoolExecutor:线程池中最核心的类,用来执行被提交的任务,后面会详细介绍。
ScheduledExecutorService:继承
ExecutorService
接口,并定义延迟或定期执行的方法。ScheduledThreadPoolExecutor:ThreadPoolExecutor子类,它在ThreadPoolExecutor基础上加入了任务定时执行的功能。
二、线程池状态
线程存在生命周期(六种),同样线程池也有生命周期,源码中定义了五种状态。
1 |
|
ThreadPoolExecutor
使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量
状态名 | 高3位 | 接收新任务 | 处理阻塞任务队列 | 说明 |
---|---|---|---|---|
RUNNING | 111 | Y | Y | 能接受新提交的任务,并且也能处理阻塞队列中的任务 |
SHUTDOWN | 000 | N | Y | 关闭状态,不会接收新任务,但会处理阻塞队列剩余任务 |
STOP | 001 | N | N | 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程,并抛弃阻塞队列任务 |
TIDYING | 010 | - | - | 任务全执行完毕,活动线程为 0 即将进入终结 |
TERMINATED | 011 | - | - | 终结状态 |
- 线程池生命周期示意图
说明
- 刚开始线程处于RUNNING状态,能接受新提交的任务,并且也能处理阻塞队列中的任务;
- 当调用了线程池的shutdown()方法进入到SHUTDOWN状态,此时意图停止线程池,它是一种比较温和的,正在执行的任务和阻塞队列中的任务,都不会取消掉,等这些任务都被处理掉后,线程池才会关闭,但是调用了shutdown()以后,就不会再接收新任务了;
- 当调用了线程池中的
shutdownNow()
方法,此时进入到STOP状态,暴力停止线程池,不会接收新任务,正在执行的任务也会被打断(通过调用interrupt()来打断),阻塞队列的任务也会被抛弃掉; - TIDYING状态是一个过渡状态,当任务全部执行完毕,活动的线程也为0了,线程池即将进入TERMINATED状态;
- TERMINATED状态的线程池已经不能工作了,处于终结状态。
从数字上比较:TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING
(RUNNING为负数,因为是高三位,最高位为符号位)
为了保证状态信息以及线程数量在赋值时的原子性;他们被存储在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,这样就可以用一次cas原子操作进行赋值。
1 |
|
三、ThreadPoolExecutor构造方法
1 |
|
ThreadPoolExecutor包含了7个核心参数:
corePoolSize
:线程池中的核心线程数maximumPoolSize
:最大线程池的大小keepAliveTime
:当线程池中线程数大于corePoolSize
,并且没有可执行任务时大于corePoolSize
那部分线程的存活时间unit
:keepAliveTime
的时间单位workQueue
:用来暂时保存任务的工作队列threadFactory
:线程工厂提供线程的创建方式,默认使用Executors.defaultThreadFactory()
handler
:当线程池所处理的任务数超过其承载容量或关闭后继续有任务提交时,所调用的拒绝策略
核心参数详解
ThreadPoolExecutor中包含了七大核心参数,如果需要对线程池进行定制化操作,需要对其中比较核心的参数进行一定程度的认识。
corePoolSize
ThreadPoolExecutor
会根据corePoolSize
和maximumPoolSize
在构造方法中设置的边界值自动调整池大小,也可以使用setCorePoolSize
和setMaximumPoolSize
动态更改,关于线程数量的自动调整分为以下两种场景:
线程数量小于
corePoolSize
当在线程池中提交了一个新任务,并且运行的线程少于
corePoolSize
时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。线程数量介于
corePoolSize
和maximumPoolSize
之间如果运行的线程数多于
corePoolSize
但少于maximumPoolSize
,则仅当队列已满时才会创建新线程。如果
corePoolSize
和maximumPoolSize
相同,那么可以创建一个固定大小的线程池。如果maximumPoolSize
被设置为无界值(Integer.MAX_VALUE
),在资源允许的前提下,意味着线程池允许容纳任意数量的并发任务。
默认情况下,即使是核心线程也会在新任务到达时开始创建和启动,如果使用非空队列创建线程池,可以通过重写prestartCoreThread
或prestartAllCoreThreads
方法动态覆盖,进行线程预启动。
keepAliveTime
keepAliveTime
参数用来设置空闲时间。- 如果池当前有多于
corePoolSize
数量的线程,多余的线程如果空闲时间超过将会被终止,这种机制减少了在任务数量较少时线程池资源消耗。 - 如果某个时间需要处理的任务数量增加,则将构造新线程。
- 使用方法
setKeepAliveTime
可以动态更改参数值。
默认情况下,keep-alive
策略仅适用于超过corePoolSize
线程的情况,但是方法allowCoreThreadTimeOut
也可用于将此超时策略应用于核心线程,只要 keepAliveTime值不为零即可。
workQueue
workQueue参数用来指定存放提交任务的队列,任何BlockingQueue
都可以用来传输和保存提交的任务。
关于队列大小与线程数量之间存在这样的关系:
- 如果线程数少于
corePoolSize
,对于提交的新任务会创建一个新的线程处理,并不会把任务放入队列; - 如果线程数介于
corePoolSize
和maximumPoolSize
之间,新提交的任务会被放入阻塞队列中; - 如果线程池处于饱和状态,即无法创建线程也无法存放在阻塞队列,那么新任务将交由拒绝策略来处理。
线程池中的常用阻塞队列一般包含SynchronousQueue
、LinkedBlockingQueue
、ArrayBlockingQueue
几种,它们都是BlockingQueue
的实现类。
下面进行简单介绍:
SynchronousQueue(直接提交队列 )
SynchronousQueue
并不能算得上一个真正的队列,虽然实现了BlockingQueue
接口,但是并没有容量,不能存储任务。只是维护一组线程,在等待着把元素加入或移出队列,相当于直接交接任务给具体执行的线程。- 如果没有立即可用的线程来运行任务,则尝试将任务排队失败,因此将构造一个新线程。
- 在处理可能具有内部依赖关系的请求集时,此策略可避免锁定。
- 这种队列方式通常需要无限的
maximumPoolSizes
以避免拒绝新提交的任务。 - 当任务提交的平均到达速度快于线程处理速度时,线程存在无限增长的可能性,而
CachedThreadPool
正式采用这种形式。
LinkedBlockingQueue(无界的任务队列)
LinkedBlockingQueue
是采用链表实现的无界队列,如果使用没有预定义容量的LinkedBlockingQueue
,当所有corePoolSize
线程都在处理任务时,将导致新任务都会在队列中等待,不会创建超过corePoolSize
个线程。这种场景下
maximumPoolSize
的值对于线程数量没有任何影响。这种依托队列处理任务的方式恰与
SynchronousQueue
依托线程处理任务的方式相反。
ArrayBlockingQueue(有界的任务队列)
ArrayBlockingQueue
是通过数组实现的有界队列。有界队列在与有限的
maximumPoolSizes
一起使用时有助于防止资源耗尽,但可能更难以调整和控制。使用
ArrayBlockingQueue
可以根据应用场景,预先估计池和队列的容量,互相权衡队列大小和最大池大小:- 使用大队列和小池:减少线程数量,可以最大限度地减少CPU使用率、操作系统资源和上下文切换开销,但可能会导致吞吐量降低
- 使用小队列大池:较大数量的线程,如果任务提交速度过快,会在短时间内提升CPU使用率,理论上可以提高系统的吞吐量。如果任务经常阻塞(如受到IO限制),会使得CPU切换更加频繁,可能会遇到更大的调度开销,这也会降低吞吐量
PriorityBlockingQueue(优先任务队列)
PriorityBlockingQueue
其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量。- 只不过其他队列一般是按照先进先出的规则处理任务,而
PriorityBlockingQueue
队列可以自定义规则根据任务的优先级顺序先后执行。
threadFactory
- 该参数提供了线程池中线程的创建方式,这里使用了工厂模式
ThreadFactory
创建新线程。 - 默认情况下,会使用
Executors.defaultThreadFactory
,它创建的线程都在同一个ThreadGroup
中,并具有相同的NORM_PRIORITY
优先级和非守护进程状态。 - 也可以根据实际场景自定义
ThreadFactory
,可以更改线程的名称、线程组、优先级、守护程序状态等; - 在自定义情况下需要注意的是如果
ThreadFactory
在从newThread
返回null时未能创建线程,则执行程序将继续,但可能无法执行任何任务。 - 线程应该拥有“
modifyThread
”RuntimePermission
的权限。 - 如果工作线程或其他使用该池的线程不具备此权限,则服务可能会降级:配置更改可能无法及时生效,关闭池可能会一直处于可以终止但未完成的状态。
handler
如果线程池处于饱和状态,没有足够的线程数或者队列空间来处理提交的任务,或者是线程池已经处于关闭状态但还在处理进行中的任务,那么继续提交的任务就会根据线程池的拒绝策略处理。
无论哪种情况,execute方法都会调用其RejectedExecutionHandler
的rejectedExecution
方法。
线程池中提供了四个预定义的处理程序策略:
- ThreadPoolExecutor.AbortPolicy (默认):该策略会直接抛出异常,阻止系统正常工作 。
- ThreadPoolExecutor.DiscardPolicy:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失。
- ThreadPoolExecutor.DiscardOldestPolicy:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交。
- ThreadPoolExecutor.CallerRunsPolicy:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行。
这些预定义策略都实现了RejectedExecutionHandler
接口,也可以定义实现类重写拒绝策略。
线程池核心组成
线程池包含 3 个核心部分:
- 线程集合:核心线程和工作线程
- 阻塞队列:用于待执行任务排队
- 拒绝策略处理器:阻塞队列满后,对任务处理进行
执行流程
- 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
- 当线程数达到
corePoolSize
并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue
队列排队,直到有空闲的线程。 - 如果队列选择了有界队列,那么任务超过了队列大小时,会创建
maximumPoolSize - corePoolSize
数目的线程来救急。 - 如果线程到达
maximumPoolSize
仍然有新任务这时会执行拒绝策略。 - 当高峰过去后,超过
corePoolSize
的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime
和 unit 来控制。
拒绝策略 jdk 提供了 4 种实现:
AbortPolicy
:让调用者抛出RejectedExecutionException
异常,这是默认策略CallerRunsPolicy
:让调用者运行任务DiscardPolicy
:放弃本次任务DiscardOldestPolicy
:放弃队列中最早的任务,本任务取而代之
其它著名框架也提供了实现:
Dubbo
的实现,在抛出RejectedExecutionException
异常之前会记录日志,并 dump 线程栈信息,方便定位问题- Netty 的实现,是创建一个新线程来执行任务
ActiveMQ
的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略PinPoint
的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
合理设置线程数
https://juejin.cn/post/7072281409053786120
四、Execute 原理
当一个新任务提交至线程池之后,线程池的处理流程如下:
- 首先判断当前运行的线程数量是否小于 corePoolSize。如果是,则创建一个工作线程来执行任务;如果都在执行任务,则进入步骤 2。
- 判断
BlockingQueue
是否已经满了,若没满,则将任务放入BlockingQueue
;若满了,则进入步骤 3。 - 判断当前运行的总线程数量是否小于 maximumPoolSize,如果是则创建一个新的工作线程来执行任务。
- 否则交给 RejectedExecutionHandler 来处理任务。
当 ThreadPoolExecutor 创建新线程时,通过 CAS 来更新线程池的状态 ctl。
五、ThreadPoolExecutor线程池提交任务方法
线程的各种提交方式的概念:
1 |
|
submit方法
submit方法与execute方法的区别在于:
- execute方法:接收的参数是Runnable类型的参数没有返回值
- submit方法:接收的是Callable类型的参数有返回值且返回值用
Future<>
接收
示例
1 |
|
运行结果
1 |
|
说明
从代码中我们看到submit()提交任务中实现了Callable接口,并在睡眠了1s后返回字符串“ok”,最后在主线程中调用get()方法,打印返回值到控制台。
invokeAll方法
invokeAll方法接收的是一个任务集合且有返回值,线程池中的线程执行这个任务集合。
等待所有的任务执行完成后统一返回。
示例
1 |
|
运行结果
1 |
|
结果分析
从结果可以看出,任务1与任务2同时被执行,但是因为线程池的核心线程数为2 所以任务3就先放入了任务队列,之后等待任务2执行完后,线程2执行任务3,中间差了0.5s 是因为任务2执行了0.5s,最终遍历返回结果并打印。
invokeAny方法
invokeAny方法提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消。
代码示例
1 |
|
运行结果
1 |
|
结果分析
程序中设置三个核心线程数,表示三个任务可以同时运行,不需要加入到任务队列中,从结果可以看出,任务2使用0.5s时间最先执行完,并返回结果,此时整个任务队列停止等待任务1和任务3继续执行,输出结果任务2。
六、ThreadPoolExecutor关闭线程池
shutdown
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行,比如如果主线程此时调用了这个shutDown方法,此时并不会阻塞主线程
1 |
|
- 源码
1 |
|
shutdownNow
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
1 |
|
- 源码
1 |
|
其他方法(了解)
1 |
|
代码示例
1 |
|
shutdown运行结果
1 |
|
结果分析
可以看出在shutdown后依然可以把 shutdown之前的任务运行完毕,但是shutdown之后的任务就没有再运行了。
另外awaitTermination方法的作用是等待shutdown部分的任务运行完后主线程再运行awaitTermination方法之后的代码。
shutdownNow运行结果
1 |
|
结果分析
从结果可以看出在shutdownNow之后只有一个任务运行成功了,也就是别的任务都已经被打断了。