ThreadPoolExecutor线程池介绍

一、概述

1.1 线程池的作用

线程池技术就是线程的重用技术,使用已经创建好的线程来执行当前任务,并提供了针对线程周期开销和资源冲突问题的解决方案。 由于请求到达时线程已经存在,因此消除了线程创建过程导致的延迟,使应用程序得到更快的响应。

1.2 线程池的好处

  • 使用线程池可以重复利用已有的线程继续执行任务,避免线程在创建和销毁时造成的消耗;
  • 由于没有线程创建和销毁时的消耗,可以提高系统响应速度;
  • 通过线程池可以对线程进行合理的管理,根据系统的承受能力调整可运行线程数量的大小等。

1.3 线程池的结构

image-20230330163404438

说明

  • Executor:线程池相关顶级接口,是 Java 异步任务目标的“执行者”接口,其目标是执行目标任务。只定义了一个execute()方法(void execute(Runnable command);),只能提交Runnable形式的任务,不支持提交Callable带有返回值的任务。

    Executor作为执行者的角色,其目的是提供一种将“任务提交者”与“任务执行者”分离开来的机制。

  • ExecutorService:继承并扩展了Executor接口,在Executor的基础上加入了线程池的生命周期管理,可以通过shutdown或者shutdownNow方法来关闭线程池。ExecutorService支持提交Callable形式的任务,提交完Callable任务后拿到一个Future(代表一个异步任务执行的结果)。

  • AbstarctExecutorService:是一个抽象类,它实现了 ExecutorService 接口,为 ExecutorService 接口中的方法提供默认实现。

  • ThreadPoolExecutor:线程池中最核心的类,用来执行被提交的任务,后面会详细介绍。

  • ScheduledExecutorService:继承ExecutorService接口,并定义延迟或定期执行的方法。

  • ScheduledThreadPoolExecutor:ThreadPoolExecutor子类,它在ThreadPoolExecutor基础上加入了任务定时执行的功能。

二、线程池状态

线程存在生命周期(六种),同样线程池也有生命周期,源码中定义了五种状态。

1
2
3
4
5
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量

状态名 高3位 接收新任务 处理阻塞任务队列 说明
RUNNING 111 Y Y 能接受新提交的任务,并且也能处理阻塞队列中的任务
SHUTDOWN 000 N Y 关闭状态,不会接收新任务,但会处理阻塞队列剩余任务
STOP 001 N N 不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程,并抛弃阻塞队列任务
TIDYING 010 - - 任务全执行完毕,活动线程为 0 即将进入终结
TERMINATED 011 - - 终结状态
  • 线程池生命周期示意图

image-20230330163535335

说明

  • 刚开始线程处于RUNNING状态,能接受新提交的任务,并且也能处理阻塞队列中的任务;
  • 当调用了线程池的shutdown()方法进入到SHUTDOWN状态,此时意图停止线程池,它是一种比较温和的,正在执行的任务和阻塞队列中的任务,都不会取消掉,等这些任务都被处理掉后,线程池才会关闭,但是调用了shutdown()以后,就不会再接收新任务了;
  • 当调用了线程池中的shutdownNow()方法,此时进入到STOP状态,暴力停止线程池,不会接收新任务,正在执行的任务也会被打断(通过调用interrupt()来打断),阻塞队列的任务也会被抛弃掉;
  • TIDYING状态是一个过渡状态,当任务全部执行完毕,活动的线程也为0了,线程池即将进入TERMINATED状态;
  • TERMINATED状态的线程池已经不能工作了,处于终结状态。

从数字上比较:TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING(RUNNING为负数,因为是高三位,最高位为符号位)

为了保证状态信息以及线程数量在赋值时的原子性;他们被存储在一个原子变量ctl中,目的是将线程池状态与线程个数合二为一,这样就可以用一次cas原子操作进行赋值。

1
2
3
4
// c为旧值,ctlOf返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workCountOf(c)));
// rs为高3位,代表线程池状态,wc为低29位代表线程个数,ctlOf是合并他们
private static int ctlOf(int rs, int wc){return rs | wc;}

三、ThreadPoolExecutor构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

ThreadPoolExecutor包含了7个核心参数

  • corePoolSize:线程池中的核心线程数

  • maximumPoolSize:最大线程池的大小

  • keepAliveTime:当线程池中线程数大于corePoolSize,并且没有可执行任务时大于corePoolSize那部分线程的存活时间

  • unitkeepAliveTime的时间单位

  • workQueue:用来暂时保存任务的工作队列

  • threadFactory:线程工厂提供线程的创建方式,默认使用Executors.defaultThreadFactory()

  • handler:当线程池所处理的任务数超过其承载容量或关闭后继续有任务提交时,所调用的拒绝策略

核心参数详解

ThreadPoolExecutor中包含了七大核心参数,如果需要对线程池进行定制化操作,需要对其中比较核心的参数进行一定程度的认识。

corePoolSize

ThreadPoolExecutor会根据corePoolSizemaximumPoolSize在构造方法中设置的边界值自动调整池大小,也可以使用setCorePoolSizesetMaximumPoolSize动态更改,关于线程数量的自动调整分为以下两种场景:

  • 线程数量小于corePoolSize

    当在线程池中提交了一个新任务,并且运行的线程少于corePoolSize时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。

  • 线程数量介于corePoolSizemaximumPoolSize之间

    如果运行的线程数多于corePoolSize但少于maximumPoolSize,则仅当队列已满时才会创建新线程。

    如果corePoolSizemaximumPoolSize相同,那么可以创建一个固定大小的线程池。如果maximumPoolSize被设置为无界值(Integer.MAX_VALUE),在资源允许的前提下,意味着线程池允许容纳任意数量的并发任务。

默认情况下,即使是核心线程也会在新任务到达时开始创建和启动,如果使用非空队列创建线程池,可以通过重写prestartCoreThreadprestartAllCoreThreads方法动态覆盖,进行线程预启动。

keepAliveTime
  • keepAliveTime参数用来设置空闲时间。
  • 如果池当前有多于corePoolSize数量的线程,多余的线程如果空闲时间超过将会被终止,这种机制减少了在任务数量较少时线程池资源消耗。
  • 如果某个时间需要处理的任务数量增加,则将构造新线程。
  • 使用方法setKeepAliveTime可以动态更改参数值。

默认情况下,keep-alive策略仅适用于超过corePoolSize线程的情况,但是方法allowCoreThreadTimeOut也可用于将此超时策略应用于核心线程,只要 keepAliveTime值不为零即可。

workQueue

workQueue参数用来指定存放提交任务的队列,任何BlockingQueue都可以用来传输和保存提交的任务。

关于队列大小与线程数量之间存在这样的关系

  • 如果线程数少于corePoolSize,对于提交的新任务会创建一个新的线程处理,并不会把任务放入队列;
  • 如果线程数介于corePoolSizemaximumPoolSize之间,新提交的任务会被放入阻塞队列中;
  • 如果线程池处于饱和状态,即无法创建线程也无法存放在阻塞队列,那么新任务将交由拒绝策略来处理。

线程池中的常用阻塞队列一般包含SynchronousQueueLinkedBlockingQueueArrayBlockingQueue几种,它们都是BlockingQueue的实现类。

下面进行简单介绍

  • SynchronousQueue(直接提交队列 )

    • SynchronousQueue并不能算得上一个真正的队列,虽然实现了BlockingQueue接口,但是并没有容量,不能存储任务。只是维护一组线程,在等待着把元素加入或移出队列,相当于直接交接任务给具体执行的线程。
    • 如果没有立即可用的线程来运行任务,则尝试将任务排队失败,因此将构造一个新线程。
    • 在处理可能具有内部依赖关系的请求集时,此策略可避免锁定。
    • 这种队列方式通常需要无限的maximumPoolSizes以避免拒绝新提交的任务。
    • 当任务提交的平均到达速度快于线程处理速度时,线程存在无限增长的可能性,而CachedThreadPool正式采用这种形式。
  • LinkedBlockingQueue(无界的任务队列)

    • LinkedBlockingQueue是采用链表实现的无界队列,如果使用没有预定义容量的LinkedBlockingQueue,当所有corePoolSize线程都在处理任务时,将导致新任务都会在队列中等待,不会创建超过corePoolSize个线程。

      这种场景下maximumPoolSize的值对于线程数量没有任何影响。

    • 这种依托队列处理任务的方式恰与SynchronousQueue依托线程处理任务的方式相反。

  • ArrayBlockingQueue(有界的任务队列)

    • ArrayBlockingQueue是通过数组实现的有界队列。

    • 有界队列在与有限的maximumPoolSizes一起使用时有助于防止资源耗尽,但可能更难以调整和控制。

    • 使用ArrayBlockingQueue可以根据应用场景,预先估计池和队列的容量,互相权衡队列大小和最大池大小:

      • 使用大队列和小池:减少线程数量,可以最大限度地减少CPU使用率、操作系统资源和上下文切换开销,但可能会导致吞吐量降低
      • 使用小队列大池:较大数量的线程,如果任务提交速度过快,会在短时间内提升CPU使用率,理论上可以提高系统的吞吐量。如果任务经常阻塞(如受到IO限制),会使得CPU切换更加频繁,可能会遇到更大的调度开销,这也会降低吞吐量
  • PriorityBlockingQueue(优先任务队列)

    • PriorityBlockingQueue其实是一个特殊的无界队列,它其中无论添加了多少个任务,线程池创建的线程数也不会超过corePoolSize的数量。
    • 只不过其他队列一般是按照先进先出的规则处理任务,而PriorityBlockingQueue队列可以自定义规则根据任务的优先级顺序先后执行。
threadFactory
  • 该参数提供了线程池中线程的创建方式,这里使用了工厂模式ThreadFactory创建新线程。
  • 默认情况下,会使用 Executors.defaultThreadFactory,它创建的线程都在同一个ThreadGroup中,并具有相同的NORM_PRIORITY优先级和非守护进程状态。
  • 也可以根据实际场景自定义ThreadFactory,可以更改线程的名称、线程组、优先级、守护程序状态等;
  • 在自定义情况下需要注意的是如果ThreadFactory在从newThread返回null时未能创建线程,则执行程序将继续,但可能无法执行任何任务。
  • 线程应该拥有“modifyThreadRuntimePermission的权限。
  • 如果工作线程或其他使用该池的线程不具备此权限,则服务可能会降级:配置更改可能无法及时生效,关闭池可能会一直处于可以终止但未完成的状态。
handler

如果线程池处于饱和状态,没有足够的线程数或者队列空间来处理提交的任务,或者是线程池已经处于关闭状态但还在处理进行中的任务,那么继续提交的任务就会根据线程池的拒绝策略处理。

无论哪种情况,execute方法都会调用其RejectedExecutionHandlerrejectedExecution方法。

线程池中提供了四个预定义的处理程序策略

  • ThreadPoolExecutor.AbortPolicy (默认):该策略会直接抛出异常,阻止系统正常工作 。
  • ThreadPoolExecutor.DiscardPolicy:该策略会默默丢弃无法处理的任务,不予任何处理。当然使用此策略,业务场景中需允许任务的丢失。
  • ThreadPoolExecutor.DiscardOldestPolicy:该策略会丢弃任务队列中最老的一个任务,也就是当前任务队列中最先被添加进去的,马上要被执行的那个任务,并尝试再次提交。
  • ThreadPoolExecutor.CallerRunsPolicy:如果线程池的线程数量达到上限,该策略会把任务队列中的任务放在调用者线程当中运行。

这些预定义策略都实现了RejectedExecutionHandler接口,也可以定义实现类重写拒绝策略。

线程池核心组成

image-20230330163730801

线程池包含 3 个核心部分

  • 线程集合:核心线程和工作线程
  • 阻塞队列:用于待执行任务排队
  • 拒绝策略处理器:阻塞队列满后,对任务处理进行

执行流程

  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  • 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排队,直到有空闲的线程。
  • 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线程来救急。
  • 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。
  • 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由keepAliveTime 和 unit 来控制。

拒绝策略 jdk 提供了 4 种实现

  • AbortPolicy:让调用者抛出 RejectedExecutionException 异常,这是默认策略
  • CallerRunsPolicy:让调用者运行任务
  • DiscardPolicy:放弃本次任务
  • DiscardOldestPolicy:放弃队列中最早的任务,本任务取而代之

其它著名框架也提供了实现

  • Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方便定位问题
  • Netty 的实现,是创建一个新线程来执行任务
  • ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
  • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略

合理设置线程数

https://juejin.cn/post/7072281409053786120

四、Execute 原理

当一个新任务提交至线程池之后,线程池的处理流程如下:

image-20230330163747491

  1. 首先判断当前运行的线程数量是否小于 corePoolSize。如果是,则创建一个工作线程来执行任务;如果都在执行任务,则进入步骤 2。
  2. 判断 BlockingQueue 是否已经满了,若没满,则将任务放入 BlockingQueue;若满了,则进入步骤 3。
  3. 判断当前运行的总线程数量是否小于 maximumPoolSize,如果是则创建一个新的工作线程来执行任务。
  4. 否则交给 RejectedExecutionHandler 来处理任务。

当 ThreadPoolExecutor 创建新线程时,通过 CAS 来更新线程池的状态 ctl。

五、ThreadPoolExecutor线程池提交任务方法

线程的各种提交方式的概念:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 执行任务
void execute(Runnable command);

// 提交任务 task,用返回值 Future 获得任务执行结果,会有返回值
<T> Future<T> submit(Callable<T> task);

// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;

// 提交 tasks 中所有任务,带超时时间,时间超时后,会放弃执行后面的任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;

// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

submit方法

submit方法与execute方法的区别在于:

  • execute方法:接收的参数是Runnable类型的参数没有返回值
  • submit方法:接收的是Callable类型的参数有返回值且返回值用Future<>接收

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.*;

/**
* Created by xiaowei
* Date 2022/11/20
* Description submit方法示例
*/
@Slf4j(topic = "c.ThreadPoolDemo01")
public class ThreadPoolDemo01 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建线程池
ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy());
//调用submit提交任务
Future<String> future = threadpool.submit(new Callable<String>() {

@Override
public String call() throws Exception {
log.debug("running");
Thread.sleep(1000);
return "ok";
}

});
//获取线程的返回结果
log.debug("{}",future.get());
}
}

运行结果

1
2
17:03:36.791 [pool-1-thread-1] DEBUG c.ThreadPoolDemo01 - running
17:03:37.796 [main] DEBUG c.ThreadPoolDemo01 - ok

说明

从代码中我们看到submit()提交任务中实现了Callable接口,并在睡眠了1s后返回字符串“ok”,最后在主线程中调用get()方法,打印返回值到控制台。

invokeAll方法

invokeAll方法接收的是一个任务集合且有返回值,线程池中的线程执行这个任务集合。

等待所有的任务执行完成后统一返回。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;

/**
* Created by xiaowei
* Date 2022/11/20
* Description invokeAll方法示例
*/
@Slf4j(topic = "c.ThreadPoolDemo02")
public class ThreadPoolDemo02 {
public static void main(String[] args) throws InterruptedException {
//创建线程池
ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy());

//返回的是一个泛型为Future的集合
List<Future<String>> futures = threadpool.invokeAll(Arrays.asList(
() -> {
log.debug("begin1");
Thread.sleep(1000);
return "1";
},
() -> {
log.debug("begin2");
Thread.sleep(500);
return "2";
},
() -> {
log.debug("begin3");
Thread.sleep(2000);
return "3";
}
));

//遍历输出结果
futures.forEach(f -> {
try {
log.debug("{}",f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
}
}

运行结果

1
2
3
4
5
6
17:22:03.437 [pool-1-thread-1] DEBUG c.ThreadPoolDemo02 - begin1
17:22:03.437 [pool-1-thread-2] DEBUG c.ThreadPoolDemo02 - begin2
17:22:03.942 [pool-1-thread-2] DEBUG c.ThreadPoolDemo02 - begin3
17:22:05.946 [main] DEBUG c.ThreadPoolDemo02 - 1
17:22:05.948 [main] DEBUG c.ThreadPoolDemo02 - 2
17:22:05.948 [main] DEBUG c.ThreadPoolDemo02 - 3

结果分析

从结果可以看出,任务1与任务2同时被执行,但是因为线程池的核心线程数为2 所以任务3就先放入了任务队列,之后等待任务2执行完后,线程2执行任务3,中间差了0.5s 是因为任务2执行了0.5s,最终遍历返回结果并打印。

invokeAny方法

invokeAny方法提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消。

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import lombok.extern.slf4j.Slf4j;

import java.util.Arrays;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* Created by xiaowei
* Date 2022/11/20
* Description invokeAny方法示例
*/
@Slf4j(topic = "c.ThreadPoolDemo03")
public class ThreadPoolDemo03 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//创建线程池,核心线程数为3
ThreadPoolExecutor threadpool=new ThreadPoolExecutor(3, 10,
20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy());

String future = threadpool.invokeAny(Arrays.asList(
() -> {
log.debug("begin1");
Thread.sleep(1000);
log.debug("end1");
return "1";
},
() -> {
log.debug("begin2");
Thread.sleep(500);
log.debug("end2");
return "2";
},
() -> {
log.debug("begin3");
Thread.sleep(2000);
log.debug("end3");
return "3";
}
));

log.debug("{}",future);
}
}

运行结果

1
2
3
4
5
17:34:43.143 [pool-1-thread-1] DEBUG c.ThreadPoolDemo03 - begin1
17:34:43.147 [pool-1-thread-3] DEBUG c.ThreadPoolDemo03 - begin3
17:34:43.147 [pool-1-thread-2] DEBUG c.ThreadPoolDemo03 - begin2
17:34:43.648 [pool-1-thread-2] DEBUG c.ThreadPoolDemo03 - end2
17:34:43.648 [main] DEBUG c.ThreadPoolDemo03 - 2

结果分析

程序中设置三个核心线程数,表示三个任务可以同时运行,不需要加入到任务队列中,从结果可以看出,任务2使用0.5s时间最先执行完,并返回结果,此时整个任务队列停止等待任务1和任务3继续执行,输出结果任务2。

六、ThreadPoolExecutor关闭线程池

shutdown

线程池状态变为 SHUTDOWN

  • 不会接收新任务
  • 但已提交任务会执行完
  • 此方法不会阻塞调用线程的执行,比如如果主线程此时调用了这个shutDown方法,此时并不会阻塞主线程
1
void shutdown();
  • 源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(SHUTDOWN);
// 仅会打断空闲线程
interruptIdleWorkers();
onShutdown(); // 扩展点 ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等)
tryTerminate();
}

shutdownNow

线程池状态变为 STOP

  • 不会接收新任务
  • 会将队列中的任务返回
  • 并用 interrupt 的方式中断正在执行的任务
1
List<Runnable> shutdownNow();
  • 源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 修改线程池状态
advanceRunState(STOP);
// 打断所有线程
interruptWorkers();
// 获取队列中剩余任务
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终结
tryTerminate();
return tasks;
}

其他方法(了解)

1
2
3
4
5
6
7
// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
// 一般task是Callable类型的时候不用此方法,因为futureTask.get方法自带等待功能。
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.concurrent.*;

/**
* Created by xiaowei
* Date 2022/11/20
* Description 1.0
*/
@Slf4j(topic = "c.ThreadPoolDemo04")
public class ThreadPoolDemo04 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadPoolExecutor threadpool=new ThreadPoolExecutor(2, 10,
20, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10),
new ThreadPoolExecutor.DiscardOldestPolicy());

Future<Integer> result1 = threadpool.submit(() -> {
log.debug("task 1 running...");
Thread.sleep(1000);
log.debug("task 1 finish...");
return 1;
});

Future<Integer> result2 = threadpool.submit(() -> {
log.debug("task 2 running...");
Thread.sleep(1000);
log.debug("task 2 finish...");
return 2;
});

Future<Integer> result3 = threadpool.submit(() -> {
log.debug("task 3 running...");
Thread.sleep(1000);
log.debug("task 3 finish...");
return 3;
});

// shutdown() 部分的代码
log.debug("shutdown");
threadpool.shutdown();
threadpool.submit(()->{
log.debug("task 4 running...");
Thread.sleep(1000);
log.debug("task 4 finish");
return "4";
});
threadpool.awaitTermination(3, TimeUnit.SECONDS);
log.debug("other...");

// shutdownNow部分代码
// log.debug("shutdownNow");
// List<Runnable> runnables = threadpool.shutdownNow();
// log.debug("other.... {}" , runnables);
}
}

shutdown运行结果

1
2
3
4
5
6
7
8
18:03:12.408 [pool-1-thread-2] DEBUG c.ThreadPoolDemo04 - task 2 running...
18:03:12.408 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 1 running...
18:03:12.408 [main] DEBUG c.ThreadPoolDemo04 - shutdown
18:03:13.412 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 1 finish...
18:03:13.412 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 3 running...
18:03:13.412 [pool-1-thread-2] DEBUG c.ThreadPoolDemo04 - task 2 finish...
18:03:14.412 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 3 finish...
18:03:14.412 [main] DEBUG c.ThreadPoolDemo04 - other...

结果分析

可以看出在shutdown后依然可以把 shutdown之前的任务运行完毕,但是shutdown之后的任务就没有再运行了。

另外awaitTermination方法的作用是等待shutdown部分的任务运行完后主线程再运行awaitTermination方法之后的代码。

shutdownNow运行结果

1
2
3
4
18:06:29.592 [main] DEBUG c.ThreadPoolDemo04 - shutdownNow
18:06:29.592 [pool-1-thread-1] DEBUG c.ThreadPoolDemo04 - task 1 running...
18:06:29.592 [pool-1-thread-2] DEBUG c.ThreadPoolDemo04 - task 2 running...
18:06:29.595 [main] DEBUG c.ThreadPoolDemo04 - other.... [java.util.concurrent.FutureTask@6aa8ceb6]

结果分析

从结果可以看出在shutdownNow之后只有一个任务运行成功了,也就是别的任务都已经被打断了。


ThreadPoolExecutor线程池介绍
http://example.com/2023/03/30/ThreadPoolExecutor线程池介绍/
作者
程序员小魏
发布于
2023年3月30日
许可协议