Executors工具类介绍

一、Executors类概述

Executors是Executor框架的工具类,提供了几种线程池创建方法,以及线程池中默认配置(如线程工厂)的处理,下面会对其中常用的几种创建线程池的方式进行说明。

线程池的创建分为两种方式:ThreadPoolExecutor 和 Executors;

这两种方式的创建都是返回一个ExecutorService接口类型的实现类类型,对于ExecutorService接口常用方法可参考这篇文章:ExecutorService接口使用

二、Executors常用的方法

方法 说明
public static ExecutorService newFixedThreadPool(int nThreads) 一种线程数量固定的线程池,当线程处于空闲状态时,他们并不会被回收,除非线程池被关闭。当所有的线程都处于活动状态时,新的任务都会处于等待状态,直到有线程空闲出来。
public static ExecutorService newSingleThreadExecutor() 创建单个线程。它适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。
public static ExecutorService newCachedThreadPool() 创建一个根据需要创建新线程的线程池,但在可用时将重新使用以前构造的线程, 如果没有可用的线程,将创建一个新的线程并将其添加到该池中。 未使用六十秒的线程将被终止并从缓存中删除。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) 创建一个线程池,可以调度命令在给定的延迟之后运行,或定期执行, 支持执行定时性或周期性任务。
public static ExecutorService newWorkStealingPool(int parallelism) 创建一个维护足够的线程以支持给定的并行级别的线程池,并且可以使用多个队列来减少争用。 ( jdk1.8版本新增的方法 )

三、常用方法示例

3.1 FixedThreadPool方法

FixedThreadPool被称为可重用固定线程数的线程池。

特点:只有核心线程,线程数量固定,执行完立即回收,任务队列为链表结构的无界队列。

应用场景:控制线程最大并发数。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
* @author xiaowei
* @date 2022-11-18
* @description 通过newFixedThreadPool获取线程池对象
* 可指定创建线程数,并且可以重复用
**/
public class ThreadPoolDemo02 {
public static void main(String[] args) {
test1();
// test2();
}
//练习newFixedThreadPool方法
private static void test1() {
//1.使用工厂类获取线程池对象
ExecutorService es = Executors.newFixedThreadPool(3);
//2.提交任务
for (int i = 1; i <= 10; i++) {
es.submit(new MyRunnable2(i));
}
}
private static void test2() {
//1.使用工厂类获取线程池对象
ExecutorService es = Executors.newFixedThreadPool(3,new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"自定义的线程名称" + n++);
}
});
//2.提交任务
for (int i = 1; i <= 10; i++) {
es.submit(new MyRunnable2(i));
}
}
}

/*
* 任务类,包含一个任务编号,在任务中打印出是哪一个线程正在执行任务
* */
class MyRunnable2 implements Runnable{
private int id;
public MyRunnable2(int id) {
this.id = id;
}

@Override
public void run() {
//获取线程的名称,打印一句话
String name = Thread.currentThread().getName();
System.out.println(name + "执行了任务" + id);
}
}

源码

1
2
3
4
5
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
  • FixedThreadPool的corePoolSize和maximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads。
  • 当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。
  • 这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。
  • FixedThreadPool使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。

运行示意图

image-20230330195411411

说明

  1. 如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。
  2. 在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue。
  3. 线程执行完任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。

使用无界队列为工作队列会对线程池带来如下影响。

  1. 当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中的线程数不会超过corePoolSize。
  2. 由于1使用无界队列时maximumPoolSize是一个无效的参数。
  3. 由于1和2使用无界队列时keepAliveTime是一个无效的参数。
  4. 使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或shutdownNow())不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)。

3.2 SingleThreadExecutor方法

SingleThreadExecutor是使用单个worker线程的Executor。

特点:只有 1 个核心线程,无非核心线程,执行完立即回收,任务队列为链表结构的无界队列。

应用场景:不适合并发但可能引起 IO 阻塞性及影响 UI 线程响应的操作,如数据库操作、文件操作等。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
* @author xiaowei
* @date 2022-11-18
* @description 通过newSingleThreadExecutor获取线程池对象
* 只会创建一个线程
**/
public class ThreadPoolDemo03 {
public static void main(String[] args) {
// test1();
test2();
}

//练习newSingleThreadExecutor方法
private static void test1() {
//1.使用工厂类获取线程池对象
ExecutorService es = Executors.newSingleThreadExecutor();
//2.提交任务
for (int i = 1; i <= 10; i++) {
es.submit(new MyRunnable3(i));
}
}
private static void test2() {
//1.使用工厂类获取线程池对象
ExecutorService es = Executors.newSingleThreadExecutor(new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"自定义的线程名称" + n++);
}
});
//2.提交任务
for (int i = 1; i <= 10; i++) {
es.submit(new MyRunnable3(i));
}
}
}
/**
* 任务类,包含一个任务编号,在任务中打印出是那一个线程正在执行任务
*/
class MyRunnable3 implements Runnable{
private int id;
public MyRunnable3(int id) {
this.id = id;
}

@Override
public void run() {
//获取线程的名称,打印一句话
String name = Thread.currentThread().getName();
System.out.println(name + "执行了任务" + id);
}
}

源码

1
2
3
4
5
6
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

说明

  • SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,其他参数与FixedThreadPool相同。
  • SingleThreadExecutor使用无界队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。
  • 当线程正在执行任务,新任务会被加入到LinkedBlockingQueue队列中,任务加入队列的速度远大于核心线程处理的能力时,无界队列会一直增大到最大值,可能导致OOM

SingleThreadExecutor使用无界队列作为工作队列对线程池带来的影响和FixedThreadPool相同。

  1. 如果当前运行的线程数少于corePoolSize(即线程池中无运行的线程),则创建一个新线程来执行任务。
  2. 在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加入LinkedBlockingQueue。
  3. 线程执行完1中任务后,会在一个无限循环中反复从LinkedBlockingQueue获取任务来执行。

3.3 CachedThreadPool方法

CachedThreadPool是一个会根据需要创建新线程的线程池。

特点:无核心线程,非核心线程数量无限,执行完闲置 60s 后回收,任务队列为不存储元素的阻塞队列。

应用场景:执行大量、耗时少的任务。

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;

/**
* @author xiaowei
* @date 2022-11-18
* @description 通过newCachedThreadPool获取线程池对象
* 练习ExecuTors获取ExecutorService,然后调用方法,提交任务
**/
public class ThreadPoolDemo01 {
public static void main(String[] args) {
// test1();
test2();
}

//newCachedThreadPool方法
private static void test1(){
// 1.使用工厂类获取线程对象
ExecutorService executorService = Executors.newCachedThreadPool();
// 2.提交任务
for (int i = 1; i <= 10 ; i++ ){
executorService.submit(new MyRunnable(i));
}
}

//练习newCachedThreadPool(ThreadFactory threadFactory)方法
private static void test2() {
//1.使用工厂类获取线程池对象
ExecutorService executorService = Executors.newCachedThreadPool(new ThreadFactory() {
int n = 1;
@Override
public Thread newThread(Runnable r) {
return new Thread(r,"自定义的线程名称" + n++);
}
});
//2.提交任务
for (int i = 1; i <= 10; i++) {
executorService.submit(new MyRunnable(i));
}
}
}

/**
* 任务类,包含一个任务编号,在任务中打印出是哪一个线程正在执行任务
* */
class MyRunnable implements Runnable {

private int id;

public MyRunnable(int id) {
this.id = id;
}

@Override
public void run() {
//获取线程的名称,打印一句话
String name = Thread.currentThread().getName();
System.out.println(name + "执行了任务" + id);
}
}

源码

1
2
3
4
5
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

说明

  • CoachedThreadPool的corePoolSize被设置为0,即corePool为空。

  • maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是无界的。

  • 这里把keepAliveTime设置为60L,意味着CacheThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。

  • CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CacheThreadPool的maximumPool是无界的。

    这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。

    极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

运行示意图

image-20230330195441852

说明

  1. 首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成;否则执行下面的步骤2。
  2. 当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤1将失败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成。
  3. 在步骤2中新创建的线程将任务执行完后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒。如果60秒钟内主线程提交了一个新任务(主线程执行步骤1),那么空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保存空闲的CachedThreadPool不会使用任何资源。

SynchronousQueue阻塞队列

SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作。CachedThreadPool使用SynchronousQueue,把主线程提交的任务传递给空闲线程执行。CachedThreadPool中任务传递的示意图。

image-20230330195504123

总结

  • newCachedThreadPool:线程的数据是不做限制的,每次有任务来的时候都会以任务优先,性能最大化(也就是服务器压力比较大)
  • newFixedThreadPool:可以让压力不那么大,并且可以规定线程的数量,当线程的数量达到指定数量的时候,这个时候就不会再有新的线程了
  • newSingleThreadExecutor:绝对的安全,不考虑性能,因为是单线程,永远只有一个线程来执行任务。

Executors方法创建线程池存在的问题

目前已不推荐使用Executors方法创建线程池,而是通过自定义 ThreadPoolExecutor 的方式。因为直接使用Executors方法创建线程池具有资源耗尽的风险。

  • newFixedThreadPoolnewSingleThreadExecutor: 主要问题是堆积的请求处理队列均采用 LinkedBlockingQueue,可能会耗费非常大的内存,甚至 OOM。
  • newCachedThreadPoolnewScheduledThreadPool: 主要问题是线程数最大数是 Integer.MAX_VALUE,可能会创建数量非常多的线程,甚至 OOM。

Executors工具类介绍
http://example.com/2023/03/30/Executors工具类介绍/
作者
程序员小魏
发布于
2023年3月30日
许可协议